消息处理时长自适应

本文介绍如何使消息处理时长自适应。

背景信息

轻量消息队列(原 MNS)的规范中,每条消息都有个默认的VisibilityTimeout,Worker在接收到消息后,Timeout就开始计时了。 如果Worker在Timeout时间内没能处理完消息,那么消息就有可能被其他Worker接收到并处理。

Timeout计时的好处在于消息处理完之后需要显式地DeleteMessage,那么如果Worker进程停止等情况发生,这条消息还有机会被其他Worker处理。

一些用户会将队列的默认VisibilityTimeout设置得比较长,以确保消息在被Worker处理完之前不会超时释放。

问题描述

队列的VisibilityTimeout是6个小时。

一个Worker接收到了消息M1,但是Worker在处理完消息之后,进程发生了Crash或者机器发生了重启。

那么M1这条消息至少在6个小时之后才会被另一个Worker接收到并处理。而自己写代码处理Failover的情况的话,程序又会变得比较复杂。

目标

在一些即时性要求比较高,并且又希望尽快响应每一条消息的场景下,用户会希望:

  • 队列的VisibilityTimeout比较短,例如5分钟。在发生了进程Crash后,最多5分钟,未处理完的消息就会被某个Worker接收到并处理。
  • Worker处理消息的过程中,耗时很有可能超过5分钟,那么消息在被处理的过程中不能超时。

解决方案

BestPractice的C# Demo可以实现这样的场景。Demo下载地址为Sample。具体的做法是,在Worker处理消息的过程中,为消息定期检查是否需要做ChangeVisibility,Worker处理完之后依然是主动deleteMessage。

遇到Demo使用问题,可提交工单处理。

下面是对于程序的几点说明:

  • 运行前需要填写accessId、accessKey、EndPoint。
  • 变量说明:
    • MessageMinimalLife是消息注册时必须有的最少的Life长度。需要这个的原因是,例如消息register的时候已经只剩下0.1秒的超时时间了,注册进来也来不及ChangeVisibility延长生命。所以,MessageMinimalLife是为了确保消息能活到被ChangeVisibility,用户可以自己根据业务压力来设置。
    • TimerInterval是Manager内部的Timer的Interval。只需要确保在Message到达MessageMinimalLife之前,Timer会被启动就足够了。时间可以设置得比较短(检查得就比较频繁)。
    • QueueMessageVisibilityTimeout是Sample里Message的默认超时时间,是Queue的属性。Sample里每次ChangeVisibility的时候,会把消息的VisibilityTimeout设置为QueueMessageVisibilityTimeout,所以它的值需要大于TimerInterval+ MessageMinimalLife,以确保消息不会超时。
    • MessageTimeout是Message在Manager里面的超时时间,例如某个Worker卡住了,消息在5个小时之后依然没有处理完毕(假设5个小时远远超出消息的正常处理时间),那么Manager就不会再为消息做ChangeVisibility了,会放任消息的Visibility超时。
  • 流程说明:
    • Worker在ReceiveMessage之后,会先做RegisterMessage,然后处理Message,最后再调用Manager的deleteMessage。
    • Manager在消息第一次注册进来之后,调用ThreadPool调度一个ChangeVisibilityTask检查是否需要ChangeVisibility,并且把Message加到内部的messages列表中。
    • Manager内部的Timer,会定时调用Parallel启动 ChangeVisibilityTask检查messages列表里的所有message。
    • Manager.ChangeMessageVisibility (ChangeVisibilityTask)相关的具体事情,在流程图里有显示。流程图如下:处理时长自适应