严格保序队列

更新时间:
复制为 MD 格式

本文介绍如何使用队列实现按顺序发送和消费消息。

背景信息

轻量消息队列(原 MNS)提供的队列(Queue)主要的特点是高可靠、高可用、高并发。每个队列的数据都会被持久化三份到阿里云的飞天分布式平台。其中每个队列至少有两台服务器向外提供服务,同时每台服务器都支持高并发访问。这些分布式特性导致了轻量消息队列(原 MNS)的队列无法像传统单机队列严格保证消息FIFO,只能做到基本有序。

队列如果同时有多个消息发送者(Sender),由于并发和网络延迟等问题,根本无法获知消息的实际发送顺序和消息到达服务器端的真实顺序。同理,当有多个接收者并发接收消息时,其真正的处理顺序也不可获知。

综上所述,只有一个发送者(一个进程,可以是多个线程)和一个接收者时,消息顺序才有意义,也只有在这种情况下才能感知和记录消息的真实发送和接收顺序。

解决方案

基于上述假设,同时为了满足部分用户对于消息消费顺序性的要求,设计了以下方案,确保消息按照用户发送顺序被接收和消费。

  1. 消息在发送端进行染色,加上SeqId(例如:#num#)。

  2. 消息在接收端进行还原,并根据SeqId排序后返回给上层,同时对于已经接收的消息会有后台线程保证消息不会被重复消费。

  3. 为了避免因为发送者或者接收者失败导致SeqId丢失。SeqId会被持久存储到本地磁盘文件,或者其他存储和数据库,例如OSS、OTSRDS。流程图

注意事项

  • 本文的主要目的是展示顺序消息的解决方案,不建议不加测试直接用于生产环境。

  • 正常情况下,发送端和接收端的SeqId应该和队列中的消息(染色)匹配。当出现删除队列重新创建等操作时,请注意磁盘文件中的SeqId是否和队列中的真实情况相符,同时建议不要往染色的消息队列里发送非染色消息。

  • 队列的消息有效期设置过短或者每条消息的实际处理结果都有可能对消息有序性造成影响,在您的程序中需要对这些情况所导致的乱序现象进行处理。

示例代码

本文以Python版的方案实现为例,下载地址:有序队列Python示例代码(依赖MNS Python SDK)。其中,主要提供了OrderedQueueWrapper类(ordered_queue.py文件),可以将普通队列包装成有序队列。

OrderedQueueWrapper提供SendMessageInOrder()和ReceiveMessageInOrder()方法:

  • 发送时对消息进行染色。

  • 接收时还原消息,并且按顺序返回给接收者。

另外,send_message_in_order.pyreceive_message_in_order.py提供发送者和接收者使用OrderedQueueWrapper的程序示例。

send_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"}   # 指定持久化发送SeqId的磁盘文件。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
    orderedQueue.SendMessageInOrder(message)
receive_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} # 指定持久化接收SeqId的磁盘文件。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
    recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)

运行方法

  1. 配置send_message_in_order.pyreceive_message_in_order.py中的配置项g_endpointg_accessKeyIdg_accessKeySecret以及g_testQueueName

  2. 运行send_message_in_order.py

    python send_message_in_order.py
  3. 运行receive_message_in_order.py

    python receive_message_in_order.py

    发送程序会发送20条消息,接收程序会按顺序消费这20条消息。

    orderedqueuewrapper receive message body is 0
    orderedqueuewrapper receive message body is 1
    orderedqueuewrapper receive message body is 2
    orderedqueuewrapper receive message body is 3
    orderedqueuewrapper receive message body is 4
    orderedqueuewrapper receive message body is 5
    orderedqueuewrapper receive message body is 6
    orderedqueuewrapper receive message body is 7
    orderedqueuewrapper receive message body is 8
    orderedqueuewrapper receive message body is 9
    orderedqueuewrapper receive message body is 10
    orderedqueuewrapper receive message body is 11
    orderedqueuewrapper receive message body is 12
    orderedqueuewrapper receive message body is 13
    orderedqueuewrapper receive message body is 14
    orderedqueuewrapper receive message body is 15
    orderedqueuewrapper receive message body is 16
    orderedqueuewrapper receive message body is 17
    orderedqueuewrapper receive message body is 18
    orderedqueuewrapper receive message body is 19
    Finish receive 20 messages

运行ordered_queue.py的测试结果对比普通队列,运行命令如下:

python ordered_queue.py
说明

ordered_queue.py需配置EndpointAccessKey。

运行结果

运行结果如下:

  • 非严格有序

    Test with common queue:
    Send Message Succeed with commont queue. message body is :1
    Send Message Succeed with commont queue. message body is :2
    Send Message Succeed with commont queue. message body is :3
    Send Message Succeed with commont queue. message body is :4
    Send Message Succeed with commont queue. message body is :5
    Send Message Succeed with commont queue. message body is :6
    Send Message Succeed with commont queue. message body is :7
    Send Message Succeed with commont queue. message body is :8
    Send Message Succeed with commont queue. message body is :9
    Send Message Succeed with commont queue. message body is :10
    Receive Message Succeed with common queue! message body is 1
    Receive Message Succeed with common queue! message body is 3
    Receive Message Succeed with common queue! message body is 2
    Receive Message Succeed with common queue! message body is 5
    Receive Message Succeed with common queue! message body is 4
    Receive Message Succeed with common queue! message body is 7
    Receive Message Succeed with common queue! message body is 6
    Receive Message Succeed with common queue! message body is 9
    Receive Message Succeed with common queue! message body is 8
    Receive Message Succeed with common queue! message body is 10

    整体有序,部分相邻消息无序,说明单个队列有多个服务器在同时服务。

  • 严格有序 消息按发送顺序依次接收,说明 OrderedQueueWrapper 保证了消息的有序性。

    Test with OrderedQueueWrapper:
    orderedqueuewrapper: send message body 1
    orderedqueuewrapper: send message body 2
    orderedqueuewrapper: send message body 3
    orderedqueuewrapper: send message body 4
    orderedqueuewrapper: send message body 5
    orderedqueuewrapper: send message body 6
    orderedqueuewrapper: send message body 7
    orderedqueuewrapper: send message body 8
    orderedqueuewrapper: send message body 9
    orderedqueuewrapper: send message body 10
    orderedqueuewrapper: receive message body is 1
    orderedqueuewrapper: receive message body is 2
    orderedqueuewrapper: receive message body is 3
    orderedqueuewrapper: receive message body is 4
    orderedqueuewrapper: receive message body is 5
    orderedqueuewrapper: receive message body is 6
    orderedqueuewrapper: receive message body is 7
    orderedqueuewrapper: receive message body is 8
    orderedqueuewrapper: receive message body is 9
    orderedqueuewrapper: receive message body is 10