Lightweight Message Queue (formerly MNS) consumption demos

更新时间:
复制 MD 格式

This topic provides code samples for receiving messages in popular programming languages. You can download the corresponding SDK packages to pull messages from a queue.

Downloads

Note

The SDK download links provide the Alibaba Cloud Communications SDK core library for each supported language and the dybaseapi package. The dybaseapi package is used to pull messages from a queue. For languages other than Java and Node.js, you must also install the Alibaba Cloud V2 SDK separately.

Language

SDK download link

Java

Java SDK

Node.js

Node.js V2 SDK

.NET/C#

  • Lightweight Message Queue (formerly MNS) client SDK

    Note

    When you install the Lightweight Message Queue (formerly MNS) client SDK, download the C# SDK package and package it into a .dll file. Alternatively, use the pre-packaged Aliyun.MNS.dll file. Then, add the following content to the csproj file in the Alibaba Cloud Communications SDK package:

    <ItemGroup>
          <PackageReference Include="AlibabaCloud.SDK.Dybaseapi20170525" Version="1.0.1" />
          <Reference Include="AliyunSDK_MNS">
              <HintPath>path\to\AliyunSDK_MNS.dll</HintPath>// Replace this with the path to the .dll file that you packaged or downloaded.
              <Private>true</Private>
          </Reference>
     </ItemGroup>
  • Alibaba Cloud Communications SDK: dybaseapi (.NET)

  • Installation method:

    # dybaseapi SDK:
    dotnet add package AlibabaCloud.SDK.Dybaseapi20170525

Python

  • Lightweight Message Queue (formerly MNS) client SDK: Python SDK

  • Alibaba Cloud Communications SDK: dybaseapi (Python)

  • Installation method:

    # Lightweight Message Queue (MNS) client SDK:
    pip install aliyun-mns-sdk
    # dybaseapi SDK:
    pip install alibabacloud_dybaseapi20170525

Go

  • Lightweight Message Queue (formerly MNS) client SDK: Go SDK

  • Alibaba Cloud Communications SDK: dybaseapi (Go)

  • Installation method:

    # Lightweight Message Queue (MNS) client SDK:
    go get github.com/aliyun/aliyun-mns-go-sdk
    # dybaseapi SDK:
    go get github.com/alibabacloud-go/dybaseapi-20170525

PHP

  • Lightweight Message Queue (formerly MNS) client SDK: PHP SDK

  • Alibaba Cloud Communications SDK: dybaseapi (PHP)

  • Installation method:

    # Lightweight Message Queue (MNS) client SDK:
    composer require aliyun/aliyun-mns-php-sdk -W
    # dybaseapi SDK:
    composer require alibabacloud/dybaseapi-20170525

Notes

When you use the sample demos, note the following. The Java language is used as an example, but the notes are also applicable to the other languages.

  • Configure your AccessKey ID and AccessKey secret.

    To prevent your AccessKey pair from being leaked, do not hard-code it in your code. We recommend that you obtain the AccessKey pair by configuring environment variables. For more information, see Configure environment variables on Linux, macOS, and Windows systems.

    This topic uses the environment variable names ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET as examples. The following code shows how to obtain the AccessKey pair from environment variables:

    String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
  • Replace messageType with the required message type, such as the recording message (LlmSmartCallRecord) from the Communication Intelligence Engine. For more information about the receipt message types supported by Artificial Intelligence Cloud Call Service, see Introduction to receipt messages and the configuration process.

    String messageType="messageType";
  • queueName is the queue name. For example, for the Communication Intelligence Engine, you can view the queue name on the Receipt Message Configuration page. To go to the page, choose Large Model Communication > Communication Intelligence Engine in the Artificial Intelligence Cloud Call Service console.

    String queueName="queueName";
  • In the sample Java code, incoming message content is processed by the dealMessage method. You can add your business logic for processing the message content in this method. The arg parameter represents the receipt message body, which contains parameters such as start_time, end_time, duration, and status_code. We recommend that you process the parameters that are relevant to your scenario.

    // Parse the message body based on the specific message format in the documentation.
    String arg = (String) contentMap.get("arg");
    // Write your business logic.

Sample demos

Java Demo

package com.alicom.mns.sample;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.alicom.mns.tools.DefaultAlicomMessagePuller;
import com.alicom.mns.tools.MessageListener;
import com.aliyun.mns.model.Message;
import com.google.gson.Gson;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This can only be used to receive messages from Alibaba Cloud Communications. It cannot be used to receive messages from other services.
 * Demo for receiving incoming messages.
 */
public class ReceiveDemo {

    private static Logger logger = Logger.getLogger(ReceiveDemo.class.getName());

    static class MyMessageListener implements MessageListener {
        private Gson gson = new Gson();

        @Override
        public boolean dealMessage(Message message) {

            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // Key values of the message
            System.out.println("message receiver time from mns:" + format.format(new Date()));
            System.out.println("message handle: " + message.getReceiptHandle());
            System.out.println("message body: " + message.getMessageBodyAsString());
            System.out.println("message id: " + message.getMessageId());
            System.out.println("message dequeue count:" + message.getDequeueCount());
            System.out.println("Thread:" + Thread.currentThread().getName());
            try {
                Map<String, Object> contentMap = gson.fromJson(message.getMessageBodyAsString(), HashMap.class);

                //TODO: Parse the message body based on the specific message format in the documentation.
                String arg = (String) contentMap.get("arg");

                //TODO: Start writing your business logic here.

            } catch (com.google.gson.JsonSyntaxException e) {
                logger.log(Level.SEVERE, "error_json_format:" + message.getMessageBodyAsString(), e);
                // In theory, format errors do not occur. If a format error occurs, you can only delete the message. Otherwise, an error is continuously reported upon redelivery.
                return true;
            } catch (Throwable e) {
                // If an exception is caused by your code, return false. This way, the message is not deleted and is redelivered based on the policy.
                return false;
            }

            // If the message is processed successfully, return true. The SDK calls the MNS delete method to delete the message from the queue.
            return true;
        }

    }

    public static void main(String[] args) throws Exception, ParseException {

        DefaultAlicomMessagePuller puller = new DefaultAlicomMessagePuller();

        // Set the size of the asynchronous thread pool, the size of the task queue, and the hibernation time for threads when no data is available.
        puller.setConsumeMinThreadSize(6);
        puller.setConsumeMaxThreadSize(16);
        puller.setThreadQueueSize(200);
        puller.setPullMsgThreadSize(1);
        // Enable this for joint debugging with the server. Do not enable it for normal use because it affects performance.
        puller.openDebugLog(false);

        //TODO: This is the AccessKey information obtained from the operating system.
        String accessKeyId=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
	String accessKeySecret=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        /*
         * TODO: Replace messageType and queueName with the required message type name and the corresponding queue name.
         * For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
         * https://help.aliyun.com/document_detail/2103608.html
         */

        // Replace this with the message type of the corresponding product.
        String messageType = "messageType";
        // After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page. The format is similar to Alicom-Queue-******-LlmSmartallReport.
        String queueName = "queueName";

        puller.startReceiveMsg(accessKeyId, accessKeySecret, messageType, queueName, new MyMessageListener());
    }


}

Python Demo

import os
import time

from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dybaseapi20170525.client import Client as OpenApiClient
from alibabacloud_dybaseapi20170525.models import QueryTokenForMnsQueueRequest

from datetime import datetime
from mns.account import Account
from mns.queue import *
from mns.mns_exception import *

try:
    import json
except ImportError:
    import simplejson as json

# TODO: Replace this with the message type that you want to receive.
# For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
# https://help.aliyun.com/document_detail/2103608.html
message_type = "LlmSmartallReport"
# TODO: Replace this with your queue name. After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queue_name from the page.
queue_name = f"Alicom-Queue-******-LlmSmartallReport"

# This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
endpoint = f"https://1943695596114318.mns.cn-hangzhou.aliyuncs.com"

config = open_api_models.Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Set the endpoint.
config.endpoint = f'dybaseapi.aliyuncs.com'
client = OpenApiClient(config)


# The service token for Alibaba Cloud Communications has an expiration time and must be dynamically updated.
class Token():
    def __init__(self):
        self.token = None
        self.tmp_access_id = None
        self.tmp_access_key = None
        self.expire_time = None

    def is_refresh(self):
        if self.expire_time is None:
            return 1
        # Compare the expiration time with the current system time. Refresh the token 2 minutes in advance.
        now = datetime.now()
        expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
        if expire <= now or (expire - now).seconds < 120:
            return 1
        return 0

    def refresh(self):
        print("start refresh token...")

        request = QueryTokenForMnsQueueRequest()
        request.message_type = message_type
        request.queue_name = queue_name

        runtime = util_models.RuntimeOptions()
        response = client.query_token_for_mns_queue_with_options(request, runtime)
        # print response
        if response is None:
            raise Exception("GET_TOKEN_FAIL", "No response was received when obtaining the token.")

        response_body = response.body
        if response_body.code != "OK":
            raise Exception("GET_TOKEN_FAIL", "Failed to obtain the token.")

        sts_token = response_body.message_token_dto
        self.tmp_access_key = sts_token.access_key_secret
        self.tmp_access_id = sts_token.access_key_id
        self.expire_time = sts_token.expire_time
        self.token = sts_token.security_token

        print("finish refresh token...")


# Initialize token, my_account, and my_queue.
token, my_account, my_queue = Token(), None, None

# Loop to read and delete messages until the queue is empty.
# The receive message request uses long polling. The wait_seconds parameter specifies a long polling period of 3 seconds.

# Long polling explained:
## When messages are in the queue, the request returns immediately.
## When no messages are in the queue, the request hangs on the MNS server for 3 seconds. If a message is written to the queue during this period, the request returns the message immediately. After 3 seconds, the request returns, which indicates that the queue has no messages.

wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
    10 * "=", 10 * "=", queue_name, wait_seconds))

while True:
    receipt_handles = []
    # Read messages.
    try:
        # Check whether the token has expired and needs to be refreshed.
        if token.is_refresh() == 1:
            # Refresh the token.
            token.refresh()
            if my_account:
                my_account.mns_client.close_connection()
                my_account = None
        if not my_account:
            my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
            my_queue = my_account.get_queue(queue_name)
            my_queue.batch_receive_message(10, wait_seconds)

        # Receive messages.
        recv_msgs = my_queue.batch_receive_message(10, wait_seconds)
        print(recv_msgs)

        for recv_msg in recv_msgs:
            # TODO: Business logic processing.

            # receipt_handles.append(recv_msg.receipt_handle)
            print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
                recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))

    except MNSExceptionBase as e:
        if e.type == "QueueNotExist":
            print("Queue not exist, please create queue before receive message.")
            break
        elif e.type == "MessageNotExist":
            print("Queue is empty! sleep 10s")
            time.sleep(10)
            continue
        print("Receive Message Fail! Exception:%s\n" % e)
        break

    # Delete messages.
    try:
        if len(receipt_handles) > 0:
            # my_queue.batch_delete_message(receipt_handles)
            print("Delete Message Succeed!  ReceiptHandles:%s" % receipt_handles)
    except MNSExceptionBase as e:
        print("Delete Message Fail! Exception:%s\n" % e)

C# Demo

using Newtonsoft.Json;
using AlibabaCloud.SDK.Dybaseapi20170525;
using AlibabaCloud.SDK.Dybaseapi20170525.Models;
using Aliyun.MNS.Model;
using Aliyun.MNS;
using System.Text;

namespace AlibabaCloud.SDK.Sample
{
    public class Sample
    {
        

        public static void Main(string[] args)
        {
            AlibabaCloud.OpenApiClient.Models.Config config = new AlibabaCloud.OpenApiClient.Models.Config
                {
                    // Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set in the code execution environment.
                    AccessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                    // Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set in the code execution environment.
                    AccessKeySecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
                };
            config.Endpoint = "dybaseapi.aliyuncs.com";
            Client client = new Client(config);

            // Specify the queue name. After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page.
            String queueName = "Alicom-Queue-******-LlmSmartallReport";
            
            // Specify the message type. For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
            // https://help.aliyun.com/document_detail/2103608.html
            String messageType = "LlmSmartallReport"; 
            


            int maxThread = 2;
            for (int i = 0; i < maxThread; i++)
            {
                TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
                Thread t = new Thread(new ThreadStart(testTask.Handle));
                // Start the thread.
                t.Start();
            }
            Console.ReadKey();

        }
    }
    class TestTask
    {
        object o = new object();
        const int sleepTime = 50;
        const long bufferTime = 60 * 2; // If the remaining validity period is less than 2 minutes, reacquire the token to prevent server time errors.
        // This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
        const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/";

        public String name { get; private set; }
        public String messageType { get; private set; }
        public String QueueName { get; private set; }
        public int TaskID { get; private set; }
        public Client DybaseapiClient { get; private set; }

        public TestTask(String name, String messageType, String queueName, Client dybaseapiClient)
        {
            this.name = name;
            this.messageType = messageType;
            this.QueueName = queueName;
            this.DybaseapiClient = dybaseapiClient;
        }

        readonly Dictionary<string, QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO>();
        readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();

        public QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO GetTokenByMessageType(Client dybaseapiClient, String messageType, String queueName)
        {
            var request = new QueryTokenForMnsQueueRequest
            {
                MessageType = messageType,
                QueueName = queueName
            };
            var queryTokenForMnsQueueResponse = dybaseapiClient.QueryTokenForMnsQueue(request);
            var token = queryTokenForMnsQueueResponse.Body.MessageTokenDTO;
            return token;
        }

        /// Process messages.
        public void Handle()
        {
            while (true)
            {
                try
                {
                    QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO token = null;
                    Queue queue = null;
                    lock (o)
                    {
                        if (tokenMap.ContainsKey(messageType))
                        {
                            token = tokenMap[messageType];
                        }

                        if (queueMap.ContainsKey(QueueName))
                        {
                            queue = queueMap[QueueName];
                        }

                        TimeSpan ts = new TimeSpan(0);

                        if (token != null)
                        {
                            DateTime b = Convert.ToDateTime(token.ExpireTime);
                            DateTime c = Convert.ToDateTime(DateTime.Now);
                            ts = b - c;
                        }

                        if (token == null || ts.TotalSeconds < bufferTime || queue == null)
                        {
                            token = GetTokenByMessageType(DybaseapiClient, messageType, QueueName);

                            var client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
                            queue = client.GetNativeQueue(QueueName);
                            if (tokenMap.ContainsKey(messageType))
                            {
                                tokenMap.Remove(messageType);
                            }
                            if (queueMap.ContainsKey(QueueName))
                            {
                                queueMap.Remove(QueueName);
                            }
                            tokenMap.Add(messageType, token);
                            queueMap.Add(QueueName, queue);
                        }
                    }

                    BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
                    List<Message> messages = batchReceiveMessageResponse.Messages;

                    for (int i = 0; i <= messages.Count - 1; i++)
                    {
                        try
                        {
                            byte[] outputb = Convert.FromBase64String(messages[i].Body);
                            string orgStr = Encoding.UTF8.GetString(outputb);
                            Console.WriteLine(orgStr);
                            // TODO: Implement the specific consumption logic.
                            // Delete the message after it is successfully consumed.
                            // queue.DeleteMessage(messages[i].ReceiptHandle);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                }
                Thread.Sleep(sleepTime);
            }
        }
    }
}

PHP Demo

<?php
use AlibabaCloud\Tea\Utils\Utils\RuntimeOptions;
use Darabonba\OpenApi\Models\Config;
use AlibabaCloud\SDK\Dybaseapi\V20170525\Dybaseapi;
use AlibabaCloud\SDK\Dybaseapi\V20170525\Models\QueryTokenForMnsQueueRequest;
use Exception;

use AliyunMNS\Client;
use AliyunMNS\Requests\BatchReceiveMessageRequest;
use AliyunMNS\Requests\BatchDeleteMessageRequest;

require_once 'vendor/autoload.php';

// An Alibaba Cloud account AccessKey has permissions to access all APIs. We recommend that you use a Resource Access Management (RAM) user for API access or routine O&M.
// We strongly recommend that you do not save your AccessKey ID and AccessKey secret in your project code. This can lead to an AccessKey leak and threaten the security of all resources in your account.
// This example shows how to use environment variables to store the AccessKey ID and AccessKey secret for identity verification.
$config = new Config([
    // Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set in the code execution environment.
    "accessKeyId" => getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
    // Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set in the code execution environment.
    "accessKeySecret" => getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
]);
$config->endpoint = "dybaseapi.aliyuncs.com";
$client = new Dybaseapi($config);

// The queue name. Replace it with your queue name.
// After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page. The format is similar to Alicom-Queue-******-LlmSmartallReport.
$queueName = 'Alicom-Queue-*******-LlmSmartallReport'; 
// The message type to receive. For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
// https://help.aliyun.com/document_detail/2103608.html
$messageType = 'LlmSmartallReport'; 

$response = null;
$token = null;
$i = 0;

do {
    try {
        if (null == $token || strtotime($token->expireTime) - time() > 2 * 60) {
            $request = new QueryTokenForMnsQueueRequest([
                'messageType' => $messageType,
                'queueName' => $queueName,
            ]);

            $runtime = new RuntimeOptions([]);
            $response = $client->queryTokenForMnsQueueWithOptions($request, $runtime);
            $token = $response->body;
        }

        $token = $response->body->messageTokenDTO;

        echo "receive message\n";
        $mnsClient = new Client(
            "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com",  // This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
            $token->accessKeyId,
            $token->accessKeySecret,
            $token->securityToken
        );
        $queue = $mnsClient->getQueueRef($queueName);

        $mnsRequest = new BatchReceiveMessageRequest(10, 5);
        $mnsRequest->setQueueName($queueName);
        $mnsResponse = $queue->batchReceiveMessage($mnsRequest);

        $receiptHandles = [];
        foreach ($mnsResponse->getMessages() as $message) {
            echo $message->getMessageBody() . "\n";
            // User logic:
            // $receiptHandles[] = $message->ReceiptHandle; // Records added to the $receiptHandles array are deleted.
        }

        if (count($receiptHandles) > 0) {
            $deleteRequest = new BatchDeleteMessageRequest($queueName, $receiptHandles);
            $queue->batchDeleteMessage($deleteRequest);
        }
    } catch (Exception $e) {
         echo $e . PHP_EOL;
        if ($e->getCode() == 404) {
            $i++;
        }
        echo $e . PHP_EOL;
    }
} while ($i < 3);

Go Demo

package main

import (
	"encoding/base64"
	"fmt"
	"os"
	"time"

	openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
	dybaseapiClient "github.com/alibabacloud-go/dybaseapi-20170525/client"
	"github.com/alibabacloud-go/tea/tea"
	mns "github.com/aliyun/aliyun-mns-go-sdk"
)

const (
	// This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
	mnsDomain = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com"
	// The queue name. Replace it with the name of the queue from which you want to receive messages.
	// After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page. The format is similar to Alicom-Queue-******-LlmSmartallReport.
	queueName = "Alicom-Queue-******-LlmSmartallReport"
	// The message type. Replace it with the message type that you want to receive. For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
	// https://help.aliyun.com/document_detail/2103608.html
	messageType = "LlmSmartallReport"
)

func main() {
	// An Alibaba Cloud account AccessKey has permissions to access all APIs. We recommend that you use a RAM user for API access or routine O&M.
	// We strongly recommend that you do not save your AccessKey ID and AccessKey secret in your project code. This can lead to an AccessKey leak and threaten the security of all resources in your account.
	// This example shows how to use environment variables to store the AccessKey ID and AccessKey secret for identity verification.
	// Create a client instance.
	config := &openapi.Config{
		// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set in the code execution environment.
		AccessKeyId: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")),
		// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set in the code execution environment.
		AccessKeySecret: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")),
	}
	config.Endpoint = tea.String("dybaseapi.aliyuncs.com")
	client, _err := dybaseapiClient.NewClient(config)
	if _err != nil {
		panic(_err)
	}

	cstLoc, _ := time.LoadLocation("Asia/Shanghai")
	var token *dybaseapiClient.QueryTokenForMnsQueueResponseBodyMessageTokenDTO
	var expireTime time.Time

	// Loop to read messages.
	for {
		// If the token expires, reacquire it.
		if token == nil || expireTime.Unix()-time.Now().Unix() < 2*60 {
			request := &dybaseapiClient.QueryTokenForMnsQueueRequest{}
			request.MessageType = tea.String(messageType)
			request.QueueName = tea.String(queueName)
			response, err := client.QueryTokenForMnsQueue(request)
			if err != nil {
				panic(err)
			}
			token = response.Body.MessageTokenDTO
			expireTime, err = time.ParseInLocation("2006-01-02 15:04:05", *token.ExpireTime, cstLoc)
			if err != nil {
				panic(err)
			}
		}

		clientConfig := mns.AliMNSClientConfig{
			EndPoint:        mnsDomain,
			AccessKeyId:     *token.AccessKeyId,
			AccessKeySecret: *token.AccessKeySecret,
			Token:           *token.SecurityToken,
		}
		mnsClient := mns.NewAliMNSClientWithConfig(clientConfig)
		queue := mns.NewMNSQueue(queueName, mnsClient)

		endChan := make(chan int)
		respChan := make(chan mns.BatchMessageReceiveResponse)
		errChan := make(chan error)
		// Message processing coroutine.
		go func() {
			select {
			case resp := <-respChan:
				{
					// Message response processing. When a message is received through respChan:
					// fmt.Printf("response: %+v\n", resp)

					// Process each message.
					for _, msg := range resp.Messages {
						// Parse the message.
						messageBody, decodeErr := base64.StdEncoding.DecodeString(msg.MessageBody)
						if decodeErr != nil {
							panic(decodeErr)
						}
						fmt.Println("message body: ", string(messageBody))
						// todo: Business logic processing.

						if ret, e := queue.ChangeMessageVisibility(msg.ReceiptHandle, 5); e != nil {
							fmt.Printf("ChangeMessageVisibility: %+v\n", e)
						} else {
							fmt.Printf("visibility changed: %+v\n", ret)
							fmt.Println("delete it now: ", ret.ReceiptHandle)

							// After the visibility is changed, use queue.DeleteMessage to delete the message.
							if e := queue.DeleteMessage(ret.ReceiptHandle); e != nil {
								fmt.Println(e)
							}
						}
					}

					endChan <- 1
				}
			case <-errChan:
				{
					// todo: Error handling. When an error is received through errChan:
					// fmt.Println(err)

					endChan <- 1
				}
			}
		}()

		// Receive messages.
		queue.BatchReceiveMessage(respChan, errChan, 10, 5)
		<-endChan
	}
}