Transmit oversized messages

更新时间:
复制 MD 格式

Simple Message Queue (formerly MNS) limits each message to 64 KB. To send larger messages without splitting them, you can store the message body in Object Storage Service (OSS) and pass only an object reference through the queue. This Claim-Check pattern lets producers and consumers exchange payloads of any size while staying within the queue size limit.

How it works

  1. The producer checks the message body size. If the body exceeds 64 KB, the producer uploads it as an object to OSS.

  2. The producer sends an OSS object reference (not the full body) to the SMQ queue.

  3. The consumer reads the message from the queue and checks whether the body is an OSS object reference.

  4. If the body is an OSS object reference, the consumer downloads the object from OSS and returns the full message body to the application.

The following diagram shows this workflow.

image

Usage notes

  • Oversized messages consume significant network bandwidth. Ensure that both the producer and consumer have sufficient bandwidth for the expected message sizes.

  • Large message transfers are time-consuming and may be affected by network jitter. Implement retries in your application to handle transient errors.

Prerequisites

Before you begin, make sure that you have:

Sample code

The following example shows how to implement the Claim-Check pattern with SMQ and OSS. You can download the full source from LargeMessageDemo.java.

package com.aliyun.mns.sample.scenarios.largeMessage;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;

public class LargeMessageDemo {

    private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
    private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
    private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
    private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
    /**
     * In this example, messages whose size is larger than 4 KB are sent to OSS.
     */
    private final static Long payloadSizeThreshold = 4L;

    public static void main(String[] args) throws ClientException {
        // Get access credentials from environment variables.
        EnvironmentVariableCredentialsProvider credentialsProvider =
            CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();

        // Create an OSS client.
        OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);

        // Create an SMQ client.
        // Configure the AccessKey ID and AccessKey secret as environment variables.
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
        CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);

        // Re-create the queue and topic for a clean demo environment.
        ReCreateUtil.reCreateQueue(client, MNS_QUEUE_NAME);
        ReCreateUtil.reCreateTopic(client, MNS_TOPIC_NAME);

        // Configure the extended client with OSS and SMQ settings.
        MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
            .setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
            .setMNSQueue(queue)
            .setMNSTopic(cloudTopic)
            .setPayloadSizeThreshold(payloadSizeThreshold);

        MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);

        // Send and receive a normal-sized message.
        Message normalMessage = new Message();
        normalMessage.setMessageBodyAsRawString("1");
        mnsExtendedClient.sendMessage(normalMessage);
        Message message = mnsExtendedClient.receiveMessage(10);
        System.out.println("[normal]ReceiveMsg:" + message.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(message.getReceiptHandle());

        // Send and receive an oversized message.
        String largeMsgBody = "largeMessage";
        Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);

        Message largeMessage = new Message();
        largeMessage.setMessageBodyAsRawString(largeMsgBody);

        mnsExtendedClient.sendMessage(largeMessage);
        Message receiveMessage = mnsExtendedClient.receiveMessage(10);
        System.out.println("[large]ReceiveMsg:" + receiveMessage.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());

        client.close();
        ossClient.shutdown();
    }
}