Send and receive transactional messages

更新时间:
复制 MD 格式

This topic provides the sample code for using TCP SDK for Java to send and receive transactional messages.

MSMQ provides a distributed transaction function similar to X/Open XA, which can achieve the final consistency of distributed transactions by MSMQ transaction messages.

Note

For new users, we recommend that you read the demo project to learn how to build a MSMQ project before sending and receiving messages.

Interaction process

The following figure shows the interaction process of transactional messages.

事务消息交互流程

For more information, see Message types> Transactional messages.

Prerequisites

You have completed the following operations:

Send transactional messages

Note

For more information about the sample code, see the MSMQ code library.

Sending a transactional message involves the following two steps:

  1. Send a half message and execute a local transaction. The following sample code provides an example:

    import java.util.Properties;
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessagingAccessPoint;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.OMSBuiltinKeys;
    import io.openmessaging.api.transaction.LocalTransactionChecker;
    import io.openmessaging.api.transaction.LocalTransactionExecuter;
    import io.openmessaging.api.transaction.TransactionProducer;
    import io.openmessaging.api.transaction.TransactionStatus;
    
    public class TransactionProducerTest {
        public static void main(String... args) {
            Properties credentials = new Properties();
            // The AccessKey pair of an Alibaba Cloud account has permissions to access all API operations. This is a high risk. We strongly recommend that you create and use a RAM user for API access or routine O&M. Log on to the RAM console to create a RAM user. 
            // Save the AccessKey pair and AccessKeySecret in environment variables. 
            // We strongly recommend that you do not save the AccessKey and AccessKeySecret in the code. This may cause key leakage.
            credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
            credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
    
            // Set the TCP endpoint and go to the Overview page in the console to view the endpoint configuration.
            MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint").withCredentials(credentials).build();
    
            Properties properties = new Properties();
            // Specify a user instance and go to the Overview page in the console to view the endpoint configuration.
            properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
            properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
    
            TransactionProducer producer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
                @Override
                public TransactionStatus check (Message msg){
                    // check business commit status
                    return TransactionStatus.CommitTransaction;
                }
            });
            producer.start();
    
            Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
            producer.send(message, new LocalTransactionExecuter() {
                @Override
                public TransactionStatus execute(Message msg, Object arg) {
                    // if business success, then commit; else rollback
                    return TransactionStatus.CommitTransaction;
                }
            }, null);
        }
    }
  2. The status of the committed transactional message. When the execution of a local transaction is completed (the execution succeeds or fails), the server needs to be notified of the transaction status of the current message. You can use one of the following notification methods:

    • Commit after the local transaction is executed.

    • The status of the local transaction has not been committed until the server checks the transaction status of the message. The following three types of transaction statuses are available:

      • TransactionStatus.CommitTransaction the transaction is committed, the subscriber is allowed to consume the message.

      • TransactionStatus.RollbackTransaction the transaction is rolled back, the message is discarded and consumption is not allowed.

      • TransactionStatus.Unknow the status cannot be determined, expect the MSMQ broker to query the status of the local transaction corresponding to the message to the sender again.

Transaction check mechanism

  • Why must the check mechanism be implemented to send transactional messages? When the half message is sent in Step 1, but the local transaction returns a status of TransactionStatus.Unknow, or the application exits and the local transaction does not commit any status, the status of the half message is unknown from the perspective of the broker. Therefore, the broker periodically requires the sender to check the half status message and report its final status.

  • What does the business logic need to do when Check is called back? In the Check method of transactional messages, some logic for checking transactional consistency should be written. When the MSMQ sends a transactional message, it needs to implement the LocalTransactionChecker interface to process the local transaction status check request initiated by the Broker. Therefore, in the Check method of the transactional message, two things need to be completed:

    1. Check the status (committed or rollback) of the local transaction corresponding to the half transactional message.

    2. Commit the status of the local transaction of the half-transactional message to the broker.

Subscribe to transactional messages

The subscription of transactional messages is the same as that of normal messages. For more information, see Subscribe to messages.