全部产品
云市场

事务型异步消息说明

更新时间:2019-09-20 20:47:29

发送事务型异步消息

事务型消息指发布消息的应用系统在本地数据库事务操作序列中发送的消息。此类消息的投递与数据库事务状态保持一致,当事务状态为“提交”时,消息会被投递到订阅者;当事务状态为“回滚”时,消息不会被投递到订阅者。

事务型消息发布者(Publisher)配置实例如下所示:

  1. <sofa:publisher id="mqPublisher" group="P_appname_service">
  2. <sofa:channels tx-callback="txCallbackListener">
  3. <sofa:channel value="TP_DEFAULT"/>
  4. </sofa:channels>
  5. <sofa:binding.msg_broker/>
  6. </sofa:publisher>
  7. <bean id="txCallbackListener" class="com.antcloud.tutorial.mq.endpoint.service.TxCallbackListener" />

事务型消息配置必须设置 tx-callback 属性值 txCallbackListener,此实现类必须实现 com.alipay.common.event.UniformEventTxSynMessageListener 接口,以作为事务型消息回查接口。

当消息代理组件无法确定事务型消息是处于“提交”状态还是“回滚”状态时,会主动调用消息发布者实现的事务型消息回查接口。这种回查属于异常场景,不是每次发送事务型消息都会触发。事务回查接口实现实例如下所示:

  1. public class TxCallbackListener implements UniformEventTxSynMessageListener {
  2. @Override
  3. public void onUniformEventTxSynchronized(UniformEvent message, UniformEventContext uContext) {
  4. try {
  5. if (!txMessageCheck(message)) {
  6. uContext.setRollbackOnly(); // 设置状态为回滚
  7. }
  8. } catch (Exception e) {
  9. throw e; // 抛出异常
  10. }
  11. }
  12. private boolean txMessageCheck(UniformEvent message) {
  13. return false;
  14. }
  15. }

发布端应用系统接收到事务型消息回查后可能存在以下三种处理策略:

  • 设置状态为“回滚”,对应的消息不会投递到订阅者。
  • 抛出异常,事务型消息回查会后续重新执行。
  • onUniformEventTxSynchronized 正常执行完毕,状态为“提交”,对应的消息投递到订阅者。

事务型消息发布者服务实现与 快速入门 文档中描述的普通消息发布者服务实现有以下两点不同:

  • 必须设置 uniformEvent.setTransactional(true)
  • 必须在 Spring 事务模板中发送消息。具体实现代码如下所示:

    1. public class UniformEventPublisherService {
    2. private static final Logger logger = LoggerFactory.getLogger(UniformEventPublisherService.class);
    3. private UniformEventPublisher uniformEventPublisher;
    4. private UniformEventBuilder uniformEventBuilder = new DefaultUniformEventBuilder();
    5. private TransactionTemplate transactionTemplate;
    6. // 发布事务型消息,消息类型由 topic/eventcode 指定,业务对象作为消息 payload。
    7. public boolean publicTransactionUniformEvent(String topic, String eventcode, Object payload) {
    8. // 创建消息,第一个参数是 topic, 第二个参数是 eventcode
    9. final UniformEvent uniformEvent = uniformEventBuilder.buildUniformEvent(topic, eventcode);
    10. // 设置消息负载,一般为业务对象
    11. uniformEvent.setEventPayload(payload);
    12. // transactional 为 true 代表事务型消息
    13. uniformEvent.setTransactional(true);
    14. transactionTemplate.execute(new TransactionCallback() {
    15. @Override
    16. public Object doInTransaction(TransactionStatus status) {
    17. try {
    18. uniformEventPublisher.publishUniformEvent(uniformEvent);
    19. } catch (Exception e) {
    20. // 事务型消息状态与本地事务一同回滚
    21. status.setRollbackOnly();
    22. }
    23. return null;
    24. }
    25. });
    26. return true;
    27. }
    28. public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
    29. this.transactionTemplate = transactionTemplate;
    30. }
    31. }