Subscribe to messages

更新时间:
复制 MD 格式

This topic describes how to subscribe to messages by using SOFAStack MSMQ SDK for Java.

Subscription mode

MSMQ supports one of the following subscription methods:

  • A cluster subscribes to all consumers identified by the same group ID and consumes messages equally. For example, a topic has nine messages and a group ID has three consumer instances. In the cluster consumption mode, each instance consumes only three messages.

    // Set the cluster subscription mode. If you do not set this parameter, the cluster subscription mode is used by default.
      properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
  • All consumers identified by the same group ID in a broadcast subscription consume a message once. For example, a topic has nine messages and a group ID has three consumer instances. In the broadcast consumption mode, each instance consumes nine messages.

    // Set the broadcast subscription method.
      properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING);
Note

Make sure that all consumer instances under the same group ID have the same subscription relationship. For more information, see Subscription consistency. The two subscription methods have different limits. For example, the broadcast mode does not support ordered messages, does not maintain the consumption progress, and does not support resetting the consumer offset. For more information, see Cluster consumption and broadcast consumption.

Sample code

For more information about the sample code, see the MSMQ code library.

import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Action;
import io.openmessaging.api.ConsumeContext;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;

public class MConsumer {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // The AccessKey pair of an Alibaba Cloud account has permissions to access all API operations. This is a high risk. We strongly recommend that you create and use a RAM user for API access or routine O&M. Log on to the RAM console to create a RAM user. 
        // Save the AccessKey pair and AccessKeySecret in environment variables. 
        // We strongly recommend that you do not save the AccessKey and AccessKeySecret in the code. This may cause key leakage.
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");

        // Set the TCP endpoint and go to the Overview page in the console to view the endpoint configuration.
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();

        Properties properties = new Properties();
        // Specify a user instance and go to the Overview page in the console to view the endpoint configuration.
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");

        // The cluster subscription method. This is the default value.
        // properties.put(PropertyKeyConst.MESSAGE_MODEL, PropertyValueConst.CLUSTERING);
        // The broadcast subscription method.
        // properties.put(PropertyKeyConst.MESSAGE_MODEL, PropertyValueConst.BROADCASTING);

        Consumer consumer = accessPoint.createConsumer(properties);
        consumer.subscribe("YOUR_TOPIC", "TAGA||TAGB", new MessageListener() {
            @Override
            public Action consume (Message message, ConsumeContext context){
                System.out.println(new String(message.getBody()));
                return Action.CommitMessage;
            }
        });
        consumer.start();
    }
}