This topic provides code samples for receiving messages in popular programming languages. You can download the corresponding SDK packages to pull messages from a queue.
Downloads
The SDK download links provide the Alibaba Cloud Communications SDK core library for each supported language and the dybaseapi package. The dybaseapi package is used to pull messages from a queue. For languages other than Java and Node.js, you must also install the Alibaba Cloud V2 SDK separately.
Language | SDK download link |
Java | |
Node.js | |
.NET/C# |
|
Python |
|
Go |
|
PHP |
|
Notes
When you use the sample demos, note the following. The Java language is used as an example, but the notes are also applicable to the other languages.
Configure your AccessKey ID and AccessKey secret.
To prevent your AccessKey pair from being leaked, do not hard-code it in your code. We recommend that you obtain the AccessKey pair by configuring environment variables. For more information, see Configure environment variables on Linux, macOS, and Windows systems.
This topic uses the environment variable names
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETas examples. The following code shows how to obtain the AccessKey pair from environment variables:String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");Replace
messageTypewith the required message type, such as the recording message (LlmSmartCallRecord) from the Communication Intelligence Engine. For more information about the receipt message types supported by Artificial Intelligence Cloud Call Service, see Introduction to receipt messages and the configuration process.String messageType="messageType";queueNameis the queue name. For example, for the Communication Intelligence Engine, you can view the queue name on the page. To go to the page, choose Large Model Communication > Communication Intelligence Engine in the Artificial Intelligence Cloud Call Service console.String queueName="queueName";In the sample Java code, incoming message content is processed by the
dealMessagemethod. You can add your business logic for processing the message content in this method. Theargparameter represents the receipt message body, which contains parameters such as start_time, end_time, duration, and status_code. We recommend that you process the parameters that are relevant to your scenario.// Parse the message body based on the specific message format in the documentation. String arg = (String) contentMap.get("arg"); // Write your business logic.
Sample demos
Java Demo
package com.alicom.mns.sample;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.alicom.mns.tools.DefaultAlicomMessagePuller;
import com.alicom.mns.tools.MessageListener;
import com.aliyun.mns.model.Message;
import com.google.gson.Gson;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This can only be used to receive messages from Alibaba Cloud Communications. It cannot be used to receive messages from other services.
* Demo for receiving incoming messages.
*/
public class ReceiveDemo {
private static Logger logger = Logger.getLogger(ReceiveDemo.class.getName());
static class MyMessageListener implements MessageListener {
private Gson gson = new Gson();
@Override
public boolean dealMessage(Message message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Key values of the message
System.out.println("message receiver time from mns:" + format.format(new Date()));
System.out.println("message handle: " + message.getReceiptHandle());
System.out.println("message body: " + message.getMessageBodyAsString());
System.out.println("message id: " + message.getMessageId());
System.out.println("message dequeue count:" + message.getDequeueCount());
System.out.println("Thread:" + Thread.currentThread().getName());
try {
Map<String, Object> contentMap = gson.fromJson(message.getMessageBodyAsString(), HashMap.class);
//TODO: Parse the message body based on the specific message format in the documentation.
String arg = (String) contentMap.get("arg");
//TODO: Start writing your business logic here.
} catch (com.google.gson.JsonSyntaxException e) {
logger.log(Level.SEVERE, "error_json_format:" + message.getMessageBodyAsString(), e);
// In theory, format errors do not occur. If a format error occurs, you can only delete the message. Otherwise, an error is continuously reported upon redelivery.
return true;
} catch (Throwable e) {
// If an exception is caused by your code, return false. This way, the message is not deleted and is redelivered based on the policy.
return false;
}
// If the message is processed successfully, return true. The SDK calls the MNS delete method to delete the message from the queue.
return true;
}
}
public static void main(String[] args) throws Exception, ParseException {
DefaultAlicomMessagePuller puller = new DefaultAlicomMessagePuller();
// Set the size of the asynchronous thread pool, the size of the task queue, and the hibernation time for threads when no data is available.
puller.setConsumeMinThreadSize(6);
puller.setConsumeMaxThreadSize(16);
puller.setThreadQueueSize(200);
puller.setPullMsgThreadSize(1);
// Enable this for joint debugging with the server. Do not enable it for normal use because it affects performance.
puller.openDebugLog(false);
//TODO: This is the AccessKey information obtained from the operating system.
String accessKeyId=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKeySecret=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/*
* TODO: Replace messageType and queueName with the required message type name and the corresponding queue name.
* For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
* https://help.aliyun.com/document_detail/2103608.html
*/
// Replace this with the message type of the corresponding product.
String messageType = "messageType";
// After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page. The format is similar to Alicom-Queue-******-LlmSmartallReport.
String queueName = "queueName";
puller.startReceiveMsg(accessKeyId, accessKeySecret, messageType, queueName, new MyMessageListener());
}
}Python Demo
import os
import time
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dybaseapi20170525.client import Client as OpenApiClient
from alibabacloud_dybaseapi20170525.models import QueryTokenForMnsQueueRequest
from datetime import datetime
from mns.account import Account
from mns.queue import *
from mns.mns_exception import *
try:
import json
except ImportError:
import simplejson as json
# TODO: Replace this with the message type that you want to receive.
# For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
# https://help.aliyun.com/document_detail/2103608.html
message_type = "LlmSmartallReport"
# TODO: Replace this with your queue name. After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queue_name from the page.
queue_name = f"Alicom-Queue-******-LlmSmartallReport"
# This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
endpoint = f"https://1943695596114318.mns.cn-hangzhou.aliyuncs.com"
config = open_api_models.Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Set the endpoint.
config.endpoint = f'dybaseapi.aliyuncs.com'
client = OpenApiClient(config)
# The service token for Alibaba Cloud Communications has an expiration time and must be dynamically updated.
class Token():
def __init__(self):
self.token = None
self.tmp_access_id = None
self.tmp_access_key = None
self.expire_time = None
def is_refresh(self):
if self.expire_time is None:
return 1
# Compare the expiration time with the current system time. Refresh the token 2 minutes in advance.
now = datetime.now()
expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
if expire <= now or (expire - now).seconds < 120:
return 1
return 0
def refresh(self):
print("start refresh token...")
request = QueryTokenForMnsQueueRequest()
request.message_type = message_type
request.queue_name = queue_name
runtime = util_models.RuntimeOptions()
response = client.query_token_for_mns_queue_with_options(request, runtime)
# print response
if response is None:
raise Exception("GET_TOKEN_FAIL", "No response was received when obtaining the token.")
response_body = response.body
if response_body.code != "OK":
raise Exception("GET_TOKEN_FAIL", "Failed to obtain the token.")
sts_token = response_body.message_token_dto
self.tmp_access_key = sts_token.access_key_secret
self.tmp_access_id = sts_token.access_key_id
self.expire_time = sts_token.expire_time
self.token = sts_token.security_token
print("finish refresh token...")
# Initialize token, my_account, and my_queue.
token, my_account, my_queue = Token(), None, None
# Loop to read and delete messages until the queue is empty.
# The receive message request uses long polling. The wait_seconds parameter specifies a long polling period of 3 seconds.
# Long polling explained:
## When messages are in the queue, the request returns immediately.
## When no messages are in the queue, the request hangs on the MNS server for 3 seconds. If a message is written to the queue during this period, the request returns the message immediately. After 3 seconds, the request returns, which indicates that the queue has no messages.
wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
10 * "=", 10 * "=", queue_name, wait_seconds))
while True:
receipt_handles = []
# Read messages.
try:
# Check whether the token has expired and needs to be refreshed.
if token.is_refresh() == 1:
# Refresh the token.
token.refresh()
if my_account:
my_account.mns_client.close_connection()
my_account = None
if not my_account:
my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
my_queue = my_account.get_queue(queue_name)
my_queue.batch_receive_message(10, wait_seconds)
# Receive messages.
recv_msgs = my_queue.batch_receive_message(10, wait_seconds)
print(recv_msgs)
for recv_msg in recv_msgs:
# TODO: Business logic processing.
# receipt_handles.append(recv_msg.receipt_handle)
print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))
except MNSExceptionBase as e:
if e.type == "QueueNotExist":
print("Queue not exist, please create queue before receive message.")
break
elif e.type == "MessageNotExist":
print("Queue is empty! sleep 10s")
time.sleep(10)
continue
print("Receive Message Fail! Exception:%s\n" % e)
break
# Delete messages.
try:
if len(receipt_handles) > 0:
# my_queue.batch_delete_message(receipt_handles)
print("Delete Message Succeed! ReceiptHandles:%s" % receipt_handles)
except MNSExceptionBase as e:
print("Delete Message Fail! Exception:%s\n" % e)
C# Demo
using Newtonsoft.Json;
using AlibabaCloud.SDK.Dybaseapi20170525;
using AlibabaCloud.SDK.Dybaseapi20170525.Models;
using Aliyun.MNS.Model;
using Aliyun.MNS;
using System.Text;
namespace AlibabaCloud.SDK.Sample
{
public class Sample
{
public static void Main(string[] args)
{
AlibabaCloud.OpenApiClient.Models.Config config = new AlibabaCloud.OpenApiClient.Models.Config
{
// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set in the code execution environment.
AccessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set in the code execution environment.
AccessKeySecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
};
config.Endpoint = "dybaseapi.aliyuncs.com";
Client client = new Client(config);
// Specify the queue name. After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page.
String queueName = "Alicom-Queue-******-LlmSmartallReport";
// Specify the message type. For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
// https://help.aliyun.com/document_detail/2103608.html
String messageType = "LlmSmartallReport";
int maxThread = 2;
for (int i = 0; i < maxThread; i++)
{
TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
Thread t = new Thread(new ThreadStart(testTask.Handle));
// Start the thread.
t.Start();
}
Console.ReadKey();
}
}
class TestTask
{
object o = new object();
const int sleepTime = 50;
const long bufferTime = 60 * 2; // If the remaining validity period is less than 2 minutes, reacquire the token to prevent server time errors.
// This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/";
public String name { get; private set; }
public String messageType { get; private set; }
public String QueueName { get; private set; }
public int TaskID { get; private set; }
public Client DybaseapiClient { get; private set; }
public TestTask(String name, String messageType, String queueName, Client dybaseapiClient)
{
this.name = name;
this.messageType = messageType;
this.QueueName = queueName;
this.DybaseapiClient = dybaseapiClient;
}
readonly Dictionary<string, QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO>();
readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();
public QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO GetTokenByMessageType(Client dybaseapiClient, String messageType, String queueName)
{
var request = new QueryTokenForMnsQueueRequest
{
MessageType = messageType,
QueueName = queueName
};
var queryTokenForMnsQueueResponse = dybaseapiClient.QueryTokenForMnsQueue(request);
var token = queryTokenForMnsQueueResponse.Body.MessageTokenDTO;
return token;
}
/// Process messages.
public void Handle()
{
while (true)
{
try
{
QueryTokenForMnsQueueResponseBody.QueryTokenForMnsQueueResponseBodyMessageTokenDTO token = null;
Queue queue = null;
lock (o)
{
if (tokenMap.ContainsKey(messageType))
{
token = tokenMap[messageType];
}
if (queueMap.ContainsKey(QueueName))
{
queue = queueMap[QueueName];
}
TimeSpan ts = new TimeSpan(0);
if (token != null)
{
DateTime b = Convert.ToDateTime(token.ExpireTime);
DateTime c = Convert.ToDateTime(DateTime.Now);
ts = b - c;
}
if (token == null || ts.TotalSeconds < bufferTime || queue == null)
{
token = GetTokenByMessageType(DybaseapiClient, messageType, QueueName);
var client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
queue = client.GetNativeQueue(QueueName);
if (tokenMap.ContainsKey(messageType))
{
tokenMap.Remove(messageType);
}
if (queueMap.ContainsKey(QueueName))
{
queueMap.Remove(QueueName);
}
tokenMap.Add(messageType, token);
queueMap.Add(QueueName, queue);
}
}
BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
List<Message> messages = batchReceiveMessageResponse.Messages;
for (int i = 0; i <= messages.Count - 1; i++)
{
try
{
byte[] outputb = Convert.FromBase64String(messages[i].Body);
string orgStr = Encoding.UTF8.GetString(outputb);
Console.WriteLine(orgStr);
// TODO: Implement the specific consumption logic.
// Delete the message after it is successfully consumed.
// queue.DeleteMessage(messages[i].ReceiptHandle);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
Thread.Sleep(sleepTime);
}
}
}
}
PHP Demo
<?php
use AlibabaCloud\Tea\Utils\Utils\RuntimeOptions;
use Darabonba\OpenApi\Models\Config;
use AlibabaCloud\SDK\Dybaseapi\V20170525\Dybaseapi;
use AlibabaCloud\SDK\Dybaseapi\V20170525\Models\QueryTokenForMnsQueueRequest;
use Exception;
use AliyunMNS\Client;
use AliyunMNS\Requests\BatchReceiveMessageRequest;
use AliyunMNS\Requests\BatchDeleteMessageRequest;
require_once 'vendor/autoload.php';
// An Alibaba Cloud account AccessKey has permissions to access all APIs. We recommend that you use a Resource Access Management (RAM) user for API access or routine O&M.
// We strongly recommend that you do not save your AccessKey ID and AccessKey secret in your project code. This can lead to an AccessKey leak and threaten the security of all resources in your account.
// This example shows how to use environment variables to store the AccessKey ID and AccessKey secret for identity verification.
$config = new Config([
// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set in the code execution environment.
"accessKeyId" => getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set in the code execution environment.
"accessKeySecret" => getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
]);
$config->endpoint = "dybaseapi.aliyuncs.com";
$client = new Dybaseapi($config);
// The queue name. Replace it with your queue name.
// After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page. The format is similar to Alicom-Queue-******-LlmSmartallReport.
$queueName = 'Alicom-Queue-*******-LlmSmartallReport';
// The message type to receive. For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
// https://help.aliyun.com/document_detail/2103608.html
$messageType = 'LlmSmartallReport';
$response = null;
$token = null;
$i = 0;
do {
try {
if (null == $token || strtotime($token->expireTime) - time() > 2 * 60) {
$request = new QueryTokenForMnsQueueRequest([
'messageType' => $messageType,
'queueName' => $queueName,
]);
$runtime = new RuntimeOptions([]);
$response = $client->queryTokenForMnsQueueWithOptions($request, $runtime);
$token = $response->body;
}
$token = $response->body->messageTokenDTO;
echo "receive message\n";
$mnsClient = new Client(
"https://1943695596114318.mns.cn-hangzhou.aliyuncs.com", // This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
$token->accessKeyId,
$token->accessKeySecret,
$token->securityToken
);
$queue = $mnsClient->getQueueRef($queueName);
$mnsRequest = new BatchReceiveMessageRequest(10, 5);
$mnsRequest->setQueueName($queueName);
$mnsResponse = $queue->batchReceiveMessage($mnsRequest);
$receiptHandles = [];
foreach ($mnsResponse->getMessages() as $message) {
echo $message->getMessageBody() . "\n";
// User logic:
// $receiptHandles[] = $message->ReceiptHandle; // Records added to the $receiptHandles array are deleted.
}
if (count($receiptHandles) > 0) {
$deleteRequest = new BatchDeleteMessageRequest($queueName, $receiptHandles);
$queue->batchDeleteMessage($deleteRequest);
}
} catch (Exception $e) {
echo $e . PHP_EOL;
if ($e->getCode() == 404) {
$i++;
}
echo $e . PHP_EOL;
}
} while ($i < 3);Go Demo
package main
import (
"encoding/base64"
"fmt"
"os"
"time"
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
dybaseapiClient "github.com/alibabacloud-go/dybaseapi-20170525/client"
"github.com/alibabacloud-go/tea/tea"
mns "github.com/aliyun/aliyun-mns-go-sdk"
)
const (
// This is a fixed endpoint for Alibaba Cloud Communications. Do not modify it.
mnsDomain = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com"
// The queue name. Replace it with the name of the queue from which you want to receive messages.
// After you enable the corresponding message service on the Alibaba Cloud Communications page, you can obtain the corresponding queueName from the page. The format is similar to Alicom-Queue-******-LlmSmartallReport.
queueName = "Alicom-Queue-******-LlmSmartallReport"
// The message type. Replace it with the message type that you want to receive. For all receipt message types in Artificial Intelligence Cloud Call Service, see the following URL:
// https://help.aliyun.com/document_detail/2103608.html
messageType = "LlmSmartallReport"
)
func main() {
// An Alibaba Cloud account AccessKey has permissions to access all APIs. We recommend that you use a RAM user for API access or routine O&M.
// We strongly recommend that you do not save your AccessKey ID and AccessKey secret in your project code. This can lead to an AccessKey leak and threaten the security of all resources in your account.
// This example shows how to use environment variables to store the AccessKey ID and AccessKey secret for identity verification.
// Create a client instance.
config := &openapi.Config{
// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set in the code execution environment.
AccessKeyId: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")),
// Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set in the code execution environment.
AccessKeySecret: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")),
}
config.Endpoint = tea.String("dybaseapi.aliyuncs.com")
client, _err := dybaseapiClient.NewClient(config)
if _err != nil {
panic(_err)
}
cstLoc, _ := time.LoadLocation("Asia/Shanghai")
var token *dybaseapiClient.QueryTokenForMnsQueueResponseBodyMessageTokenDTO
var expireTime time.Time
// Loop to read messages.
for {
// If the token expires, reacquire it.
if token == nil || expireTime.Unix()-time.Now().Unix() < 2*60 {
request := &dybaseapiClient.QueryTokenForMnsQueueRequest{}
request.MessageType = tea.String(messageType)
request.QueueName = tea.String(queueName)
response, err := client.QueryTokenForMnsQueue(request)
if err != nil {
panic(err)
}
token = response.Body.MessageTokenDTO
expireTime, err = time.ParseInLocation("2006-01-02 15:04:05", *token.ExpireTime, cstLoc)
if err != nil {
panic(err)
}
}
clientConfig := mns.AliMNSClientConfig{
EndPoint: mnsDomain,
AccessKeyId: *token.AccessKeyId,
AccessKeySecret: *token.AccessKeySecret,
Token: *token.SecurityToken,
}
mnsClient := mns.NewAliMNSClientWithConfig(clientConfig)
queue := mns.NewMNSQueue(queueName, mnsClient)
endChan := make(chan int)
respChan := make(chan mns.BatchMessageReceiveResponse)
errChan := make(chan error)
// Message processing coroutine.
go func() {
select {
case resp := <-respChan:
{
// Message response processing. When a message is received through respChan:
// fmt.Printf("response: %+v\n", resp)
// Process each message.
for _, msg := range resp.Messages {
// Parse the message.
messageBody, decodeErr := base64.StdEncoding.DecodeString(msg.MessageBody)
if decodeErr != nil {
panic(decodeErr)
}
fmt.Println("message body: ", string(messageBody))
// todo: Business logic processing.
if ret, e := queue.ChangeMessageVisibility(msg.ReceiptHandle, 5); e != nil {
fmt.Printf("ChangeMessageVisibility: %+v\n", e)
} else {
fmt.Printf("visibility changed: %+v\n", ret)
fmt.Println("delete it now: ", ret.ReceiptHandle)
// After the visibility is changed, use queue.DeleteMessage to delete the message.
if e := queue.DeleteMessage(ret.ReceiptHandle); e != nil {
fmt.Println(e)
}
}
}
endChan <- 1
}
case <-errChan:
{
// todo: Error handling. When an error is received through errChan:
// fmt.Println(err)
endChan <- 1
}
}
}()
// Receive messages.
queue.BatchReceiveMessage(respChan, errChan, 10, 5)
<-endChan
}
}