全部产品
云市场

步骤三:开发单元化应用

更新时间:2020-04-01 19:17:04

在单元化架构中,您需要对本地应用进行开发改造,完成单元化功能配置。本文将基于转账、积分等场景分别介绍微服务(MS)中的 SOFARPC、消息队列(MQ)以及分布式事务(DTX)如何完成 LDC 单元化相关的业务开发。

前提条件

路由参数为 userId,格式如 080066600000002,取第一位 + 0 作为分片位(sharding key)
当 userId = 080066600000002 时,分片位(sharding key)= 0 + 0 = 00
当 userId = 180066600000000 时,分片位(sharding key)= 1 + 0 = 10

微服务中的 SOFARPC

按照 RPC 服务引用的标准使用规范,有如下要求:

  1. 接口入参的第一个参数为userId 路由信息。
  2. SOFARPC 默认从这个参数中提取和生成两位分片位(sharding key),默认提取 UID 的倒数二、三位,参见 com.alipay.sofa.rpc.ldc.DefaultLdcRouteProvider

在本 demo 中,因无法满足该规范,将自定义一个 PocLdcRouteProvider,实现提取两位分片位的逻辑,替代 DefaultLdcRouteProvider 的标准实现。

  1. import com.alipay.sofa.rpc.api.ldc.LdcRouteJudgeResult;
  2. import com.alipay.sofa.rpc.api.ldc.LdcRouteProvider;
  3. import com.alipay.sofa.rpc.common.utils.CommonUtils;
  4. import com.alipay.sofa.rpc.config.ConsumerConfig;
  5. import com.alipay.sofa.rpc.core.request.SofaRequest;
  6. import com.alipay.zoneclient.api.EnterpriseZoneClientHolder;
  7. public class PocLdcRouteProvider implements LdcRouteProvider {
  8. @Override
  9. public LdcRouteJudgeResult uidGenerator(ConsumerConfig consumerConfig, SofaRequest sofaRequest) {
  10. LdcRouteJudgeResult result = new LdcRouteJudgeResult();
  11. //如果没有开启ldc,那么就直接返回false
  12. if (!EnterpriseZoneClientHolder.isZoneMode()) {
  13. return result;
  14. }
  15. Object[] methodArgs = sofaRequest.getMethodArgs();
  16. if (CommonUtils.isEmpty(methodArgs)) {
  17. result.setSuccess(false);
  18. return result;
  19. }
  20. Object methodArg = methodArgs[0];
  21. if (methodArg instanceof String) {
  22. String routeUid = (String) methodArg;
  23. String uid = UIDUtil.parseShardingKeyFromBacc(routeUid);
  24. result.setSuccess(true);
  25. result.setRouteId(uid);
  26. return result;
  27. }
  28. result.setSuccess(false);
  29. return result;
  30. }
  31. @Override
  32. public int order() {
  33. return 2;
  34. }
  35. }

UIDUtil 类的代码如下:

  1. public final class UIDUtil {
  2. private UIDUtil() {
  3. }
  4. public static String parseShardingKeyFromBacc(String bacc) {
  5. if (bacc == null) {
  6. throw new NullPointerException("bacc is null");
  7. }
  8. if ("".equals(bacc.trim())) {
  9. throw new IllegalArgumentException("bacc is empty");
  10. }
  11. return bacc.substring(0, 1) + "0";
  12. }
  13. }

交易启动时,首先需要向 SOFARPC 注册定制的路由逻辑,代码如下:

  1. com.alipay.sofa.rpc.ldc.LdcProviderManager.getInstance().registeLdcRouteProvider(new PocLdcRouteProvider());

在取款接口中,添加一个 UID 参数,代码如下:

  1. @TwoPhaseBusinessAction(name = "pocDebitFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
  2. public AccountTransResult debit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
  3. BusinessActionContext businessActionContext);

在存款接口中,添加一个 UID 参数,代码如下:

  1. @TwoPhaseBusinessAction(name = "pocCreditFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
  2. public AccountTransResult credit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
  3. BusinessActionContext businessActionContext);

有关 SOFARPC 单元化配置的更多信息,参见 单元化配置

消息队列

此处假设一个存款加积分的场景:在账户 A 存入一笔钱后,需要增加账户 A 的积分。此场景下,需要消息队列(MQ)中间件通过一条发送事务消息,通知积分中心执行积分增加操作。

发送事务消息

发送事务消息代码示例如下:

  1. // 启动 producer
  2. public void afterPropertiesSet() throws Exception {
  3. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").build();
  4. Properties properties = new Properties();
  5. //替换 GID_PGROUP 为您实际创建的 Group ID
  6. properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_PGROUP");
  7. transactionProducer = accessPoint.createTransactionProducer(properties, new LocalTransactionChecker() {
  8. @Override
  9. public TransactionStatus check(Message msg) {
  10. return TransactionStatus.CommitTransaction;
  11. }
  12. });
  13. transactionProducer.start();
  14. LogUtil.info(LOGGER, "transaction producer started");
  15. }
  16. // 发送消息
  17. public void publishMessage(TxnFlowRequest request) {
  18. try {
  19. PointAcctDTO pointAcctDTO = buildPointReturnDTO(request);
  20. Message message = new Message(TOPIC, EVENT_CODE, hessianSerializer.serialize(pointAcctDTO));
  21. String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
  22. message.putUserProperties(UserPropKey.CELL_UID, shardingKey);
  23. transactionProducer.send(message, (msg, arg) -> {
  24. return TransactionStatus.CommitTransaction;
  25. }, null);
  26. LogUtil.info(LOGGER,"Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] bacc [{}] payload [{}]",
  27. message.getTopic(), EVENT_CODE, message.getMsgID(), request.getBacc(), request);
  28. } catch (Exception e) {
  29. LogUtil.error(LOGGER, e,"Public a message, failure. TOPIC [{}] EVENTCODE [{}] bacc [{}] error [{}]",
  30. TOPIC, EVENT_CODE, request.getBacc(), e.getMessage());
  31. throw new TxnFlowException(PropertyConstant.CODE_SYS_ERR, "call msgBorker error", e);
  32. }
  33. }

接收事务消息

接收事务消息代码示例如下:

  1. // 启动 consumer
  2. @Override
  3. public void afterPropertiesSet() throws Exception {
  4. Properties properties = new Properties();
  5. //替换 GID_PGROUP 为您实际创建的 Group ID
  6. properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_SGROUP");
  7. properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, "RZONE");
  8. Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
  9. //替换 TP_SCRCU_POC 为您实际创建的消息 Topic
  10. consumer.subscribe("TP_SCRCU_POC", "EC_ACCT_POINT", this);
  11. consumer.start();
  12. }
  13. // 消息
  14. public Action consume(Message message, ConsumeContext context) {
  15. try {
  16. PointAcctDTO pointAcctDTO = hessianSerializer.deserialize(message.getBody(),
  17. PointAcctDTO.class.getName());
  18. serviceTemplate.executeWithTransaction(pointAcctDTO, new ServiceCallback() {
  19. @Override
  20. public String getServiceName() {
  21. return "returnPoint";
  22. }
  23. @Override
  24. public void checkParam() {
  25. //参数校验,如果为null或者空则消息不用重试
  26. pointAcctDTO.checkParameters();
  27. }
  28. @Override
  29. public void checkIdempotent() {
  30. //消息有可能出现重发的情况,如果这次积分已经更新过了,则直接返回成功。 消息幂等处理有问题
  31. List<TPointOrderDO> tPointOrderDOs = tPointOrderDAO.searchTxnSn(
  32. pointAcctDTO.getBacc(), pointAcctDTO.getTxnSn());
  33. if (tPointOrderDOs != null && tPointOrderDOs.size() != 0) {
  34. throw PcException.valueOf(PcStatusCode.IDEMPOTENT);
  35. }
  36. }
  37. @Override
  38. public void execute() {
  39. //埋点
  40. mockActionServices.mockAction(ActionPointConstant.POINTCENTER_MESSAGE,
  41. pointAcctDTO.getBacc());
  42. List<TPotAcctDO> tPotAcctDOs = tPotAcctDAO.lockForUpdate(pointAcctDTO.getBacc());
  43. //根据原来代码逻辑,如果没找到则打日志;
  44. if (tPotAcctDOs == null || tPotAcctDOs.size() != 1) {
  45. LogUtil.error(LOGGER, "error = {}, payload = {}",
  46. PcStatusCode.BACC_COUNT_ERROR, "select t_pot_acct record:"
  47. + tPotAcctDOs);
  48. return;
  49. }
  50. TPotAcctDO tPotAcctDO = tPotAcctDOs.get(0);
  51. //根据原来代码逻辑,如果状态不为0则直接返回。
  52. if (!PropertyConstant.ACC_STATUS_0.equals(tPotAcctDO.getStatus())) {
  53. return;
  54. }
  55. tPotAcctDO.setPotBal(tPotAcctDO.getPotBal().add(pointAcctDTO.getPotBal()));
  56. tPotAcctDO.setLastTxnSn(pointAcctDTO.getTxnSn());
  57. tPotAcctDAO.update(tPotAcctDO);
  58. TPointOrderDO tPointOrderDO = new TPointOrderDO(pointAcctDTO.getBacc(),
  59. pointAcctDTO.getPotBal(), pointAcctDTO.getTxnSn());
  60. tPointOrderDAO.insertOrder(tPointOrderDO);
  61. }
  62. });
  63. } catch (CodecException e) {
  64. LogUtil.error(LOGGER,e,"consume pointAcctDTO={} exception.");
  65. }
  66. // do something
  67. return Action.CommitMessage;
  68. }

分布式事务

在事务发起方的 spring 事务模板内,调用 dtxService.start() 方法开启分布式事务,需要在开启时,增加单元化架构下的分片参数,用做服务路由和数据库路由。

代码示例如下:

  1. Map<String, Object> prooerties = new HashMap<String, Object>();
  2. // 开启分布式事务
  3. String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
  4. dtxService.start("accttrans", request.getTxnSn(), shardingKey, prooerties);

有关分布式事务单元化配置的更多信息,参见 接入单元化能力

应用参数配置

在应用部署时,您还需要传入以下参数。各参数均通过 JVM 的 -D 参数传入到应用中。

中间件相关参数

  • com.alipay.instanceid:当前租户中间件实例唯一标识,可以在消息队列控制台概览页 接入配置 > 实例 ID 中获取。
  • com.antcloud.antvip.endpoint:ACVIP 地址,可以在消息队列控制台概览页 接入配置 > TCP 协议内网接入点 中获取。
  • com.alipay.env:环境标识,取值 shared,表示运行在共享模式。
  • com.antcloud.mw.access:中间件访问控制键值。
  • com.antcloud.mw.secret:中间件访问控制密钥。

同城双活相关参数

  • com.alipay.ldc.zone:单元名称。
  • com.alipay.ldc.datacenter:物理机房名称。

LDC 单元化相关参数

  • zmode:LDC 单元化开关。取值为 true 时,表示开启 LDC 单元化。
  • com.alipay.ldc.strictmode:LDC 单元化严格模式开关。取值为 true 时,表示开启 LDC 单元化严格模式。