Step 3: Develop a unitized application

更新时间:
复制 MD 格式

In a unitized architecture, you must modify your local application and configure unitization features. This topic describes how to perform LDC unitization-related development for SOFARPC in microservices (MS), Message Queue (MQ), and Distributed Transaction (DTX), using scenarios such as fund transfers and point accumulation as examples.

Prerequisites

The routing parameter is userId, with a format such as 080066600000002. The sharding key is generated by taking the first digit of the userId and appending "0". For example, if the userId is 080066600000002, the sharding key is 00. If the userId is 180066600000000, the sharding key is 10.

SOFARPC in microservices

The standard usage specifications for RPC service references have the following requirements:

  1. The first input parameter of the interface must be the userId routing parameter.

  2. By default, SOFARPC extracts this parameter to generate a two-digit sharding key. The default implementation extracts the third-to-last and second-to-last digits of the UID. For more information, see com.alipay.sofa.rpc.ldc.DefaultLdcRouteProvider.

This demo does not meet this specification. Therefore, you must create a custom PocLdcRouteProvider to implement the logic for extracting the two-digit sharding key. This custom provider overrides the standard implementation of DefaultLdcRouteProvider.

import com.alipay.sofa.rpc.api.ldc.LdcRouteJudgeResult;
import com.alipay.sofa.rpc.api.ldc.LdcRouteProvider;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.zoneclient.api.EnterpriseZoneClientHolder;

public class PocLdcRouteProvider implements LdcRouteProvider {
    @Override
    public LdcRouteJudgeResult uidGenerator(ConsumerConfig consumerConfig, SofaRequest sofaRequest) {
        LdcRouteJudgeResult result = new LdcRouteJudgeResult();

        // If LDC is not enabled, return false.
        if (!EnterpriseZoneClientHolder.isZoneMode()) {
            return result;
        }

        Object[] methodArgs = sofaRequest.getMethodArgs();
        if (CommonUtils.isEmpty(methodArgs)) {
            result.setSuccess(false);
            return result;
        }
        Object methodArg = methodArgs[0];
        if (methodArg instanceof String){
            String routeUid = (String) methodArg;
            String uid = UIDUtil.parseShardingKeyFromBacc(routeUid);
            result.setSuccess(true);
            result.setRouteId(uid);
            return result;
        }
        result.setSuccess(false);
        return result;
    }

    @Override
    public int order() {
        return 2;
    }
}

The code for the UIDUtil class is as follows:

public final class UIDUtil {
    private UIDUtil() { }

    public static String parseShardingKeyFromBacc(String bacc) {
        if (bacc == null) {
            throw new NullPointerException("bacc is null");
        }
        if ("".equals(bacc.trim())) {
            throw new IllegalArgumentException("bacc is empty");
        }
        return bacc.substring(0, 1) + "0";
    }
}

When the transaction starts, you must register the custom routing logic with SOFARPC. The code is as follows:

com.alipay.sofa.rpc.ldc.LdcProviderManager.getInstance().registeLdcRouteProvider(new PocLdcRouteProvider());

In the withdrawal interface, add a UID parameter. The code is as follows:

 @TwoPhaseBusinessAction(name = "pocDebitFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
    public AccountTransResult debit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
                                    BusinessActionContext businessActionContext);

In the deposit interface, add a UID parameter. The code is as follows:

@TwoPhaseBusinessAction(name ="pocCreditFirstAction", commitMethod ="commit", rollbackMethod ="rollback")
public AccountTransResult credit(String uid,@BusinessActionContextParameter(isParamInProperty =true)AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);

For more information about SOFARPC unitization configuration, see Unitization configuration.

Message queue

This section uses a deposit and point accumulation scenario as an example. After a deposit is made to Account A, the points for Account A must be increased. In this scenario, the Message Queue (MQ) middleware sends a transactional message to notify the point center to perform the point increase operation.

Send a transactional message

The following code shows an example of how to send a transactional message:

 // Start the producer.
    public void afterPropertiesSet()throwsException {
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").build();

        Properties properties = new Properties();
        // Replace GID_PGROUP with the actual Group ID that you created.
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_PGROUP");
        transactionProducer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
            @Override
            publicTransactionStatus check (Message msg){
                returnTransactionStatus.CommitTransaction;
            }
        });
        transactionProducer.start();
        LogUtil.info(LOGGER, "transaction producer started");
    }

    // Send the message.
    public void publishMessage(TxnFlowRequest request) {
        try {
            PointAcctDTO pointAcctDTO = buildPointReturnDTO(request);
            Message message = new Message(TOPIC, EVENT_CODE,     hessianSerializer.serialize(pointAcctDTO));
            String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
            message.putUserProperties(UserPropKey.CELL_UID, shardingKey);
            transactionProducer.send(message, (msg, arg) -> {
                returnTransactionStatus.CommitTransaction;
            }, null);
            LogUtil.info(LOGGER, "Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] bacc [{}] payload [{}]",
                    message.getTopic(), EVENT_CODE, message.getMsgID(), request.getBacc(), request);
        } catch (Exception e) {
            LogUtil.error(LOGGER, e, "Public a message, failure. TOPIC [{}] EVENTCODE [{}] bacc [{}] error [{}]",
                    TOPIC, EVENT_CODE, request.getBacc(), e.getMessage());
            throw new TxnFlowException(PropertyConstant.CODE_SYS_ERR, "call msgBorker error", e);
        }
    }

Receive a transactional message

The following code shows an example of how to receive a transactional message:

// Start the consumer.
    @Override
    public void afterPropertiesSet() throws Exception {
        Properties properties = new Properties();
        // Replace GID_PGROUP with the actual Group ID that you created.
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_SGROUP");
        properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, "RZONE");
        Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
        // Replace TP_TEST_POC with the actual message topic that you created.
        consumer.subscribe("TP_TEST_POC", "EC_ACCT_POINT", this);
        consumer.start();
    }

    // Message
    public Action consume(Message message, ConsumeContext context) {
        try {
            PointAcctDTO pointAcctDTO = hessianSerializer.deserialize(message.getBody(),
                    PointAcctDTO.class.getName());


            serviceTemplate.executeWithTransaction(pointAcctDTO, newServiceCallback() {

                @Override
                public String getServiceName () {
                    return "returnPoint";
                }

                @Override
                public void checkParam () {

                    // Parameter verification. If the parameter is null or empty, the message does not need to be retried.
                    pointAcctDTO.checkParameters();
                }

                @Override
                public void checkIdempotent () {
                    // The message may be resent. If the points have already been updated, return success directly. There is an issue with message idempotence handling.
                    List<TPointOrderDO> tPointOrderDOs = tPointOrderDAO.searchTxnSn(
                            pointAcctDTO.getBacc(), pointAcctDTO.getTxnSn());
                    if (tPointOrderDOs != null && tPointOrderDOs.size() != 0) {
                        throwPcException.valueOf(PcStatusCode.IDEMPOTENT);
                    }
                }

                @Override
                public void execute () {

                    // Instrumentation
                    mockActionServices.mockAction(ActionPointConstant.POINTCENTER_MESSAGE,
                            pointAcctDTO.getBacc());

                    List<TPotAcctDO> tPotAcctDOs = tPotAcctDAO.lockForUpdate(pointAcctDTO.getBacc());

                    // According to the original code logic, if not found, write a log.
                    if (tPotAcctDOs == null || tPotAcctDOs.size() != 1) {
                        LogUtil.error(LOGGER, "error = {}, payload = {}",
                                PcStatusCode.BACC_COUNT_ERROR, "select t_pot_acct record:"
                                        + tPotAcctDOs);
                        return;
                    }
                    TPotAcctDO tPotAcctDO = tPotAcctDOs.get(0);

                    // According to the original code logic, if the status is not 0, return directly.
                    if (!PropertyConstant.ACC_STATUS_0.equals(tPotAcctDO.getStatus())) {
                        return;
                    }

                    tPotAcctDO.setPotBal(tPotAcctDO.getPotBal().add(pointAcctDTO.getPotBal()));
                    tPotAcctDO.setLastTxnSn(pointAcctDTO.getTxnSn());
                    tPotAcctDAO.update(tPotAcctDO);
                    TPointOrderDO tPointOrderDO = new TPointOrderDO(pointAcctDTO.getBacc(),
                            pointAcctDTO.getPotBal(), pointAcctDTO.getTxnSn());
                    tPointOrderDAO.insertOrder(tPointOrderDO);

                }
            });

        } catch (CodecException e) {
            LogUtil.error(LOGGER, e, "consume pointAcctDTO={} exception.");
        }
        // do something
        return Action.CommitMessage;
    }

Distributed transaction

In the Spring transaction template of the transaction initiator, you can call the dtxService.start() method to start a distributed transaction. When you start the transaction, you must add sharding parameters for the unitized architecture. These parameters are used for service routing and database routing.

The code is as follows:

Map<String,Object> properties =newHashMap<String,Object>();

// Start the distributed transaction.
String shardingKey =UIDUtil.parseShardingKeyFromBacc(request.getBacc());
dtxService.start("accttrans", request.getTxnSn(), shardingKey, properties);

For more information about distributed transaction unitization configuration, see Enable unitization.

Task scheduling

You do not need to perform additional configurations to use the unitized task scheduling feature. For more information, see Use a cross-zone gateway.

Note

Verify that the client startup parameters are passed correctly: -D com.alipay.ldc.zone=xxx // Specifies the logical unit..

API Gateway

You do not need to perform additional configurations to use the unitized routing feature of API Gateway. For more information, see Create a routing rule and Create an API.

Application parameter settings

When you deploy the application, you must also specify the following parameters for the application using the JVM -D parameter.

Middleware parameters

  • com.alipay.instanceid: The unique ID of the current tenant's middleware instance. You can obtain this ID from the Endpoint Configuration > Instance ID section on the Overview page of the Message Queue console.

  • com.antcloud.antvip.endpoint: The ACVIP address. You can obtain this address from the Endpoint Configuration > TCP Protocol Internal Network Endpoint section on the Overview page of the Message Queue console.

  • com.alipay.env: The environment identifier. Set the value to shared to indicate that the application runs in shared mode.

  • com.antcloud.mw.access: The access key for middleware access control.

  • com.antcloud.mw.secret: The secret key for middleware access control.

Active-active zone-disaster recovery parameters

  • com.alipay.ldc.zone: The unit name.

  • com.alipay.ldc.datacenter: The name of the physical data center.

LDC unitization parameters

  • zmode: The switch for LDC unitization. Set the value to true to enable LDC unitization.

  • com.alipay.ldc.strictmode: The switch for LDC unitization strict mode. Set the value to true to enable LDC unitization strict mode.