本文介绍如何使用MNS队列实现按顺序发送和消费消息。

背景信息

MNS提供的队列(Queue)主要特点是高可靠、高可用、高并发。每个队列的数据都会被持久化三份到阿里云的飞天分布式平台;其中每个队列至少有2台服务器向外提供服务;同时每台服务器都支持高并发访问。这些分布式特性,也导致了MNS的队列无法像传统单机队列那样保证严格的消息FIFO特点,只能做到基本有序。

队列如果同时有多个消息发送者(Sender),由于并发和网络延迟不一等问题,消息的严格顺序本身就失去了意义,因为在这种情况下,我们根本无法获知消息在多个sender上的实际发送顺序和消息到达服务器端的真实顺序。同理,当有多个接收者并发接收消息时,其真正的处理顺序也不可获知。

综上所述,只有一个发送者(一个进程,可以是多个线程)、 一个接收者时,消息顺序才有意义,也只有在这种情况下才能感知和记录消息的真实发送和接收顺序。

解决方案

基于上述假设,同时为了满足部分用户对于消息消费顺序性的要求,设计了以下方案,确保消息按照用户发送顺序被接收和消费。

  1. 消息在发送端进行染色,加上SeqId(例如:#num#)。
  2. 消息在接收端进行还原,并根据SeqId排序后返回给上层,同时对于已经接收的消息会有后台线程保证消息不会被重复消费。
  3. 为了避免因为发送者fail,或者接收者fail,导致SeqId丢失。SeqId会被持久存储到本地磁盘文件。当然也可以存储到其他存储或数据库,例如OSS、OTS或RDS。严格保序

注意事项

  • 本文的主要目的是展示顺序消息的解决方案,示例代码未经过严格测试,不建议不加测试直接用于生产环境。同时程序仓促完成,难免有瑕疵,欢迎指正。
  • 正常情况下,发送端和接收端的SeqId应该和队列中的消息(染色)匹配。当出现删除队列重新创建等操作时,请注意磁盘文件中的SeqId是否和队列中的真实情况相符,同时建议不要往染色的消息队列里发送非染色消息。
  • 队列的消息有效期设置过短或者每条消息的实际处理结果都有可能会对消息有序性造成影响,在您的程序中需要对这些情况所导致的乱序现象进行处理。

程序说明

本文以Python版的方案实现为例,下载地址:有序队列Python示例代码(依赖MNS Python SDK)。其中,主要提供了OrderedQueueWrapper类(oredered_queue.py文件),可以将MNS的普通队列包装成有序队列。

OrderedQueueWrapper提供SendMessageInOrder()和ReceiveMessageInOrder()方法:

  • send中对消息进行染色
  • receive还原消息,并且按顺序返回给接收者。

另外,send_message_in_order.pyreceive_message_in_order.py提供发送者和接收者使用OrderedQueueWrapper的程序示例。

send_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"}   # 指定持久化发送SeqId的磁盘文件。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
    orderedQueue.SendMessageInOrder(message)

receive_message_in_order.py:
    #init orderedQueue
    seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} #指定持久化接收SeqId的磁盘文件
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
    recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)

运行方法

  1. 配置send_message_in_order.pyreceive_message_in_order.py中的配置项g_endpointg_accessKeyIdg_accessKeySecret以及g_testQueueName
  2. 运行send_message_in_order.py
  3. 运行receive_message_in_order.py(可以不用等send_message_in_order.py程序运行完成)。发送程序会发送20条消息,接收程序会按顺序消费这20条消息。顺序消费

也可以运行oredered_queue.py(需配置Endpoint和AccessKey)的测试用例对比与普通MNS队列的区别,运行命令如下:

$python oredered_queue.py
  • 非严格有序:整体有序,部分相邻消息无序,同时侧面证明MNS的单个队列同时有多个服务器在提供服务非严格有序
  • 严格有序严格有序