轻量消息队列(原MNS)消费Demo

本文为您提供当前主流编程语言的消息回执示例,您可通过下载相应的SDK安装包拉取队列消息。

相关下载

说明

SDK下载链接为依赖语言的云通信SDK核心库及dybaseapi,其中dybaseapi包用于拉取队列消息,除JavaNode.js外,其他语言需单独安装阿里云V2 SDK。

所属语言

SDK下载链接

Java

Java SDK

Node.js

Node.js V2 SDK

.NET/C#

  • 轻量消息队列(原MNS)客户端SDK

    说明

    在安装轻量消息队列(原MNS)客户端SDK时,您需要单独下载C# SDK包,并打包成.dll文件,或直接使用已打包好的Aliyun.MNS.dll文件,在云通信SDK包中的csproj文件中添加如下内容:

    <ItemGroup>
          <PackageReference Include="AlibabaCloud.SDK.Dybaseapi20170525" Version="1.0.1" />
          <Reference Include="AliyunSDK_MNS">
              <HintPath>path\to\AliyunSDK_MNS.dll</HintPath>//此处地址需替换成您打包/下载后.dll文件的地址
              <Private>true</Private>
          </Reference>
     </ItemGroup>
  • 云通信SDK:dybaseapi(.NET)

  • 安装方式:

    # dybaseapi SDK:
    dotnet add package AlibabaCloud.SDK.Dybaseapi20170525

Python

  • 轻量消息队列(原MNS)客户端SDK:Python SDK

  • 云通信SDK:dybaseapi(Python)

  • 安装方式:

    # 轻量消息队列MNS 客户端 SDK:
    pip install aliyun-mns-sdk
    # dybaseapi SDK:
    pip install alibabacloud_dybaseapi20170525

Go

  • 轻量消息队列(原MNS)客户端SDK:Go SDK

  • 云通信SDK:dybaseapi(Go)

  • 安装方式:

    # 轻量消息队列MNS 客户端 SDK:
    go get github.com/aliyun/aliyun-mns-go-sdk
    # dybaseapi SDK:
    go get github.com/alibabacloud-go/dybaseapi-20170525

PHP

  • 轻量消息队列(原MNS)客户端SDK:PHP SDK

  • 云通信SDK:dybaseapi(PHP)

  • 安装方式:

    # 轻量消息队列MNS 客户端 SDK:
    composer require aliyun/aliyun-mns-php-sdk -W
    # dybaseapi SDK:
    composer require alibabacloud/dybaseapi-20170525

注意事项

在使用示例Demo时,请注意如下信息。以Java语言为例,其他语言Demo注意事项可做参考。

  • 配置AccessKey IDAccessKey Secret信息。

    为避免在代码中硬编码访问密钥(AccessKey)而造成泄露,请通过配置环境变量的方式获取AccessKey。环境变量配置方法,请参见Linux、macOSWindows系统配置环境变量

    本文以环境变量名ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET为例,进行后续操作。通过环境变量获取AccessKey的代码示例如下:

    String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
  • messageType替换为您需要的消息类型,如订阅短信服务国内消息的状态报告接收(SmsReport)。短信服务支持的回执消息类型,请参见回执消息配置

    String messageType="messageType";
  • queueName是消息队列名称,以状态报告接收为例,您可以在短信服务控制台 > 通用设置 > 回执消息界面查看。

    String queueName="queueName";
  • Java示例代码中您获取到的上行信息内容由dealMessage方法处理,您可以将需要的上行信息内容业务逻辑写在该方法中。arg代表回执消息体参数,可填写的值如:start_time、end_time、duration、status_code等,建议您根据具体场景填写参数。

    // 根据文档中具体的消息格式进行消息体的解析
    String arg = (String) contentMap.get("arg");
    // 编写您的业务代码

示例Demo

Java Demo

package com.alicom.mns.sample;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.alicom.mns.tools.DefaultAlicomMessagePuller;
import com.alicom.mns.tools.MessageListener;
import com.aliyun.mns.model.Message;
import com.google.gson.Gson;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 只能用于接收云通信的消息,不能用于接收其他业务的消息
 * 短信上行消息接收demo
 */
public class ReceiveDemo {

    private static Logger logger = Logger.getLogger(ReceiveDemo.class.getName());

    static class MyMessageListener implements MessageListener {
        private Gson gson = new Gson();

        @Override
        public boolean dealMessage(Message message) {

            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //消息的几个关键值
            System.out.println("message receiver time from mns:" + format.format(new Date()));
            System.out.println("message handle: " + message.getReceiptHandle());
            System.out.println("message body: " + message.getMessageBodyAsString());
            System.out.println("message id: " + message.getMessageId());
            System.out.println("message dequeue count:" + message.getDequeueCount());
            System.out.println("Thread:" + Thread.currentThread().getName());
            try {
                Map<String, Object> contentMap = gson.fromJson(message.getMessageBodyAsString(), HashMap.class);

                //TODO 根据文档中具体的消息格式进行消息体的解析
                String arg = (String) contentMap.get("arg");

                //TODO 这里开始编写您的业务代码

            } catch (com.google.gson.JsonSyntaxException e) {
                logger.log(Level.SEVERE, "error_json_format:" + message.getMessageBodyAsString(), e);
                //理论上不会出现格式错误的情况,所以遇见格式错误的消息,只能先delete,否则重新推送也会一直报错
                return true;
            } catch (Throwable e) {
                //您自己的代码部分导致的异常,应该return false,这样消息不会被delete掉,而会根据策略进行重推
                return false;
            }

            //消息处理成功,返回true, SDK将调用MNS的delete方法将消息从队列中删除掉
            return true;
        }

    }

    public static void main(String[] args) throws Exception, ParseException {

        DefaultAlicomMessagePuller puller = new DefaultAlicomMessagePuller();

        //设置异步线程池大小及任务队列的大小,还有无数据线程休眠时间
        puller.setConsumeMinThreadSize(6);
        puller.setConsumeMaxThreadSize(16);
        puller.setThreadQueueSize(200);
        puller.setPullMsgThreadSize(1);
        //和服务端联调问题时开启,平时无需开启,消耗性能
        puller.openDebugLog(false);

        //TODO 此处为从操作系统中获取的AK信息
        String accessKeyId=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
	String accessKeySecret=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        /*
         * TODO 将messageType和queueName替换成您需要的消息类型名称和对应的队列名称
         * 短信服务下所有的回执消息类型请参见如下地址:
         * https://help.aliyun.com/zh/sms/developer-reference/configure-delivery-receipts-1
         */

        //此处应该替换成相应产品的消息类型
        String messageType = "messageType";
        //在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName,格式类似:Alicom-Queue-******-SmsReport
        String queueName = "queueName";

        puller.startReceiveMsg(accessKeyId, accessKeySecret, messageType, queueName, new MyMessageListener());
    }


}

Python Demo

import os
import time

from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dybaseapi20170525.client import Client as OpenApiClient
from alibabacloud_dybaseapi20170525.models import QueryTokenForMnsQueueRequest

from datetime import datetime
from mns.account import Account
from mns.queue import *
from mns.mns_exception import *

try:
    import json
except ImportError:
    import simplejson as json

# TODO 需要替换成您需要接收的消息类型
# 短信服务下所有的回执消息类型请参见如下地址:
# https://help.aliyun.com/zh/sms/developer-reference/configure-delivery-receipts-1
message_type = "SmsReport"
# TODO 需要替换成您的队列名称。在云通信页面开通相应业务消息后,就能在页面上获得对应的queue_name
queue_name = f"Alicom-Queue-******-SmsReport"

# 云通信固定endpoint地址,请不要修改
endpoint = f"https://1943695596114318.mns.cn-hangzhou.aliyuncs.com"

config = open_api_models.Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# 设置服务接入点
config.endpoint = f'dybaseapi.aliyuncs.com'
client = OpenApiClient(config)


# 云通信业务token存在失效时间,需动态更新。
class Token():
    def __init__(self):
        self.token = None
        self.tmp_access_id = None
        self.tmp_access_key = None
        self.expire_time = None

    def is_refresh(self):
        if self.expire_time is None:
            return 1
        # 失效时间与当前系统时间比较,提前2分钟刷新token
        now = datetime.now()
        expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
        if expire <= now or (expire - now).seconds < 120:
            return 1
        return 0

    def refresh(self):
        print("start refresh token...")

        request = QueryTokenForMnsQueueRequest()
        request.message_type = message_type
        request.queue_name = queue_name

        runtime = util_models.RuntimeOptions()
        response = client.query_token_for_mns_queue_with_options(request, runtime)
        # print response
        if response is None:
            raise Exception("GET_TOKEN_FAIL", "获取token时无响应")

        response_body = response.body
        if response_body.code != "OK":
            raise Exception("GET_TOKEN_FAIL", "获取token失败")

        sts_token = response_body.message_token_dto
        self.tmp_access_key = sts_token.access_key_secret
        self.tmp_access_id = sts_token.access_key_id
        self.expire_time = sts_token.expire_time
        self.token = sts_token.security_token

        print("finish refresh token...")


# 初始化 token, my_account, my_queue
token, my_account, my_queue = Token(), None, None

# 循环读取删除消息直到队列空
# receive message请求使用long polling方式,通过wait_seconds指定长轮询时间为3秒

# long polling 解析:
## 当队列中有消息时,请求立即返回;
## 当队列中没有消息时,请求在MNS服务器端挂3秒钟,在这期间,有消息写入队列,请求会立即返回消息,3秒后,请求返回队列没有消息;

wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
    10 * "=", 10 * "=", queue_name, wait_seconds))

while True:
    receipt_handles = []
    # 读取消息
    try:
        # token过期是否需要刷新
        if token.is_refresh() == 1:
            # 刷新token
            token.refresh()
            if my_account:
                my_account.mns_client.close_connection()
                my_account = None
        if not my_account:
            my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
            my_queue = my_account.get_queue(queue_name)
            my_queue.batch_receive_message(10, wait_seconds)

        # 接收消息
        recv_msgs = my_queue.batch_receive_message(10, wait_seconds)
        print(recv_msgs)

        for recv_msg in recv_msgs:
            # TODO 业务处理

            # receipt_handles.append(recv_msg.receipt_handle)
            print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
                recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))

    except MNSExceptionBase as e:
        if e.type == "QueueNotExist":
            print("Queue not exist, please create queue before receive message.")
            break
        elif e.type == "MessageNotExist":
            print("Queue is empty! sleep 10s")
            time.sleep(10)
            continue
        print("Receive Message Fail! Exception:%s\n" % e)
        break

    # 删除消息
    try:
        if len(receipt_handles) > 0:
            # my_queue.batch_delete_message(receipt_handles)
            print("Delete Message Succeed!  ReceiptHandles:%s" % receipt_handles)
    except MNSExceptionBase as e:
        print("Delete Message Fail! Exception:%s\n" % e)

C# Demo

using Newtonsoft.Json;
using AlibabaCloud.SDK.Dybaseapi20170525;
using AlibabaCloud.SDK.Dybaseapi20170525.Models;
using Aliyun.MNS.Model;
using Aliyun.MNS;
using System.Text;

namespace AlibabaCloud.SDK.Sample
{
    public class Sample
    {
        

        public static void Main(string[] args)
        {
            AlibabaCloud.OpenApiClient.Models.Config config = new AlibabaCloud.OpenApiClient.Models.Config
                {
                    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
                    AccessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
                    AccessKeySecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
                };
            config.Endpoint = "dybaseapi.aliyuncs.com";
            Client client = new Client(config);

            // 补充队列名称,在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName
            String queueName = "Alicom-Queue-******-SmsReport";
            
            // 补充消息类型,短信服务下所有的回执消息类型请参见如下地址:
            // https://help.aliyun.com/zh/sms/developer-reference/configure-delivery-receipts-1
            String messageType = "SmsReport"; 
            


            int maxThread = 2;
            for (int i = 0; i < maxThread; i++)
            {
                TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
                Thread t = new Thread(new ThreadStart(testTask.Handle));
                //启动线程
                t.Start();
            }
            Console.ReadKey();

        }
    }
    class TestTask
    {
        object o = new object();
        const int sleepTime = 50;
        const long bufferTime = 60 * 2; // 过期时间小于2分钟则重新获取,防止服务器时间误差
        // 云通信固定endpoint地址,请不要修改
        const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/";

        public String name { get; private set; }
        public String messageType { get; private set; }
        public String QueueName { get; private set; }
        public int TaskID { get; private set; }
        public Client DybaseapiClient { get; private set; }

        public TestTask(String name, String messageType, String queueName, Client dybaseapiClient)
        {
            this.name = name;
            this.messageType = messageType;
            this.QueueName = queueName;
            this.DybaseapiClient = dybaseapiClient;
        }

        readonly Dictionary<string, QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO>();
        readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();

        public QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO GetTokenByMessageType(Client dybaseapiClient, String messageType, String queueName)
        {
            var request = new QueryTokenForMnsQueueRequest
            {
                MessageType = messageType,
                QueueName = queueName
            };
            var queryTokenForMnsQueueResponse = dybaseapiClient.QueryTokenForMnsQueue(request);
            var token = queryTokenForMnsQueueResponse.Body.MessageTokenDTO;
            return token;
        }

        /// 处理消息
        public void Handle()
        {
            while (true)
            {
                try
                {
                    QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO token = null;
                    Queue queue = null;
                    lock (o)
                    {
                        if (tokenMap.ContainsKey(messageType))
                        {
                            token = tokenMap[messageType];
                        }

                        if (queueMap.ContainsKey(QueueName))
                        {
                            queue = queueMap[QueueName];
                        }

                        TimeSpan ts = new TimeSpan(0);

                        if (token != null)
                        {
                            DateTime b = Convert.ToDateTime(token.ExpireTime);
                            DateTime c = Convert.ToDateTime(DateTime.Now);
                            ts = b - c;
                        }

                        if (token == null || ts.TotalSeconds < bufferTime || queue == null)
                        {
                            token = GetTokenByMessageType(DybaseapiClient, messageType, QueueName);

                            var client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
                            queue = client.GetNativeQueue(QueueName);
                            if (tokenMap.ContainsKey(messageType))
                            {
                                tokenMap.Remove(messageType);
                            }
                            if (queueMap.ContainsKey(QueueName))
                            {
                                queueMap.Remove(QueueName);
                            }
                            tokenMap.Add(messageType, token);
                            queueMap.Add(QueueName, queue);
                        }
                    }

                    BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
                    List<Message> messages = batchReceiveMessageResponse.Messages;

                    for (int i = 0; i <= messages.Count - 1; i++)
                    {
                        try
                        {
                            byte[] outputb = Convert.FromBase64String(messages[i].Body);
                            string orgStr = Encoding.UTF8.GetString(outputb);
                            Console.WriteLine(orgStr);
                            // TODO 具体消费逻辑,待客户自己实现.
                            // 消费成功的前提下删除消息
                            // queue.DeleteMessage(messages[i].ReceiptHandle);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                }
                Thread.Sleep(sleepTime);
            }
        }
    }
}

PHP Demo

<?php
use AlibabaCloud\Tea\Utils\Utils\RuntimeOptions;
use Darabonba\OpenApi\Models\Config;
use AlibabaCloud\SDK\Dybaseapi\V20170525\Dybaseapi;
use AlibabaCloud\SDK\Dybaseapi\V20170525\Models\QueryTokenForMnsQueueRequest;
use Exception;

use AliyunMNS\Client;
use AliyunMNS\Requests\BatchReceiveMessageRequest;
use AliyunMNS\Requests\BatchDeleteMessageRequest;

require_once 'vendor/autoload.php';

// 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
// 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
// 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
$config = new Config([
    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
    "accessKeyId" => getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
    // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
    "accessKeySecret" => getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
]);
$config->endpoint = "dybaseapi.aliyuncs.com";
$client = new Dybaseapi($config);

// 队列名称,需替换为您的队列名称
// 在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName,格式类似:Alicom-Queue-******-SmsReport
$queueName = 'Alicom-Queue-*******-SmsReport'; 
// 需要接收的消息类型。短信服务下所有的回执消息类型请参见如下地址:
// https://help.aliyun.com/zh/sms/developer-reference/configure-delivery-receipts-1
$messageType = 'SmsReport'; 

$response = null;
$token = null;
$i = 0;

do {
    try {
        if (null == $token || strtotime($token->expireTime) - time() > 2 * 60) {
            $request = new QueryTokenForMnsQueueRequest([
                'messageType' => $messageType,
                'queueName' => $queueName,
            ]);

            $runtime = new RuntimeOptions([]);
            $response = $client->queryTokenForMnsQueueWithOptions($request, $runtime);
            $token = $response->body;
        }

        $token = $response->body->messageTokenDTO;

        echo "receive message\n";
        $mnsClient = new Client(
            "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com",  // 云通信固定endpoint地址,请不要修改
            $token->accessKeyId,
            $token->accessKeySecret,
            $token->securityToken
        );
        $queue = $mnsClient->getQueueRef($queueName);

        $mnsRequest = new BatchReceiveMessageRequest(10, 5);
        $mnsRequest->setQueueName($queueName);
        $mnsResponse = $queue->batchReceiveMessage($mnsRequest);

        $receiptHandles = [];
        foreach ($mnsResponse->getMessages() as $message) {
            echo $message->getMessageBody() . "\n";
            // 用户逻辑:
            // $receiptHandles[] = $message->ReceiptHandle; // 加入$receiptHandles数组中的记录将会被删除
        }

        if (count($receiptHandles) > 0) {
            $deleteRequest = new BatchDeleteMessageRequest($queueName, $receiptHandles);
            $queue->batchDeleteMessage($deleteRequest);
        }
    } catch (Exception $e) {
         echo $e . PHP_EOL;
        if ($e->getCode() == 404) {
            $i++;
        }
        echo $e . PHP_EOL;
    }
} while ($i < 3);

Go Demo

package main

import (
	"encoding/base64"
	"fmt"
	"os"
	"time"

	openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
	dybaseapiClient "github.com/alibabacloud-go/dybaseapi-20170525/client"
	"github.com/alibabacloud-go/tea/tea"
	mns "github.com/aliyun/aliyun-mns-go-sdk"
)

const (
	// 云通信固定endpoint地址,请不要修改
	mnsDomain = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com"
	// 队列名称,需修改为您需要接收消息的队列名称
	// 在云通信页面开通相应业务消息后,就能在页面上获得对应的queueName,格式类似:Alicom-Queue-******-SmsReport
	queueName = "Alicom-Queue-******-SmsReport"
	// 消息类型,需修改为您接收的消息类型,短信服务下所有的回执消息类型请参见如下地址:
	// https://help.aliyun.com/zh/sms/developer-reference/configure-delivery-receipts-1
	messageType = "SmsReport"
)

func main() {
	// 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
	// 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
	// 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明,来实现API访问的身份验证。
	// 创建client实例
	config := &openapi.Config{
		// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
		AccessKeyId: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")),
		// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
		AccessKeySecret: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")),
	}
	config.Endpoint = tea.String("dybaseapi.aliyuncs.com")
	client, _err := dybaseapiClient.NewClient(config)
	if _err != nil {
		panic(_err)
	}

	cstLoc, _ := time.LoadLocation("Asia/Shanghai")
	var token *dybaseapiClient.QueryTokenForMnsQueueResponseBodyMessageTokenDTO
	var expireTime time.Time

	// 循环读取消息
	for {
		// token过期则重新获取
		if token == nil || expireTime.Unix()-time.Now().Unix() < 2*60 {
			request := &dybaseapiClient.QueryTokenForMnsQueueRequest{}
			request.MessageType = tea.String(messageType)
			request.QueueName = tea.String(queueName)
			response, err := client.QueryTokenForMnsQueue(request)
			if err != nil {
				panic(err)
			}
			token = response.Body.MessageTokenDTO
			expireTime, err = time.ParseInLocation("2006-01-02 15:04:05", *token.ExpireTime, cstLoc)
			if err != nil {
				panic(err)
			}
		}

		clientConfig := mns.AliMNSClientConfig{
			EndPoint:        mnsDomain,
			AccessKeyId:     *token.AccessKeyId,
			AccessKeySecret: *token.AccessKeySecret,
			Token:           *token.SecurityToken,
		}
		mnsClient := mns.NewAliMNSClientWithConfig(clientConfig)
		queue := mns.NewMNSQueue(queueName, mnsClient)

		endChan := make(chan int)
		respChan := make(chan mns.BatchMessageReceiveResponse)
		errChan := make(chan error)
		// 消息处理协程
		go func() {
			select {
			case resp := <-respChan:
				{
					// 消息响应处理 当通过 respChan 接收到消息时:
					// fmt.Printf("response: %+v\n", resp)

					// 处理每条消息
					for _, msg := range resp.Messages {
						// 解析消息
						messageBody, decodeErr := base64.StdEncoding.DecodeString(msg.MessageBody)
						if decodeErr != nil {
							panic(decodeErr)
						}
						fmt.Println("message body: ", string(messageBody))
						// todo 业务处理

						if ret, e := queue.ChangeMessageVisibility(msg.ReceiptHandle, 5); e != nil {
							fmt.Printf("ChangeMessageVisibility: %+v\n", e)
						} else {
							fmt.Printf("visibility changed: %+v\n", ret)
							fmt.Println("delete it now: ", ret.ReceiptHandle)

							// 在可见性更改后,使用 queue.DeleteMessage 删除消息
							if e := queue.DeleteMessage(ret.ReceiptHandle); e != nil {
								fmt.Println(e)
							}
						}
					}

					endChan <- 1
				}
			case <-errChan:
				{
					// todo 错误处理,当通过 errChan 接收到错误时:
					// fmt.Println(err)

					endChan <- 1
				}
			}
		}()

		// 消息接收
		queue.BatchReceiveMessage(respChan, errChan, 10, 5)
		<-endChan
	}
}

常见问题

Mac环境下集成Python SDK出现“SSL: CERTIFICATE_VERIFY_FAILED”异常是什么原因?(websocket closed due to [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1000))

在连接WebSocket时,可能会遇到OpenSSL验证证书失败的问题,提示找不到证书。这通常是由于Python环境的证书配置不正确导致的。可以通过以下步骤手动定位并修复证书问题:

  1. 导出系统证书并设置环境变量:执行以下命令,将macOS系统中的所有证书导出到一个文件,并将其设置为Python和相关库的默认证书路径。

    security find-certificate -a -p > ~/all_mac_certs.pem
    export SSL_CERT_FILE=~/all_mac_certs.pem
    export REQUESTS_CA_BUNDLE=~/all_mac_certs.pem
  2. 创建符号链接以修复PythonOpenSSL配置:如果PythonOpenSSL配置缺失证书,可以通过以下命令手动创建符号链接。请确保替换命令中的路径为本地Python版本的实际安装目录。

    # 3.9是示例版本号,请根据您本地安装的 Python 版本调整路径ln -s /etc/ssl/* /Library/Frameworks/Python.framework/Versions/3.9/etc/openssl
  3. 重新启动终端并清除缓存:完成上述操作后,请关闭并重新打开终端,以确保环境变量生效。清除可能的缓存后重试连接WebSocket。

通过以上步骤,可以解决因证书配置错误导致的连接问题。如果问题仍未解决,请检查目标服务器的证书配置是否正确。