订阅推送可以将就绪的数据立即推送到客户端,降低通过Get API轮询造成的时延和通过队列服务查询带来的队列服务负载。然而,由于涉及较多额外概念,订阅推送具有一定的复杂性。本文将介绍队列服务订阅推送功能的使用。
消费者
消费者是指从队列服务中订阅数据的客户端程序,当客户端使用Watch API进行数据调用时,会在队列服务中生成消费者对象。API中的参数(如Window的大小、Tags),将作为消费者的属性。通过Attribute API查看消费者状态,示例如下:
[OK] Attributes:
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2
其中:consumers.stats.total表示消费者总数量。consumers.list是消费者列表,各列说明如下:
参数 | 说明 |
Id | 为消费者的ID全称,格式为 |
Index | 为当前消费者正在消费的数据index。 |
Pending | 指示当前消费者正在处理,但没有进行Commit的数据数量。 |
Status | 消费者的状态,主要的状态有:
|
Window | 消费者窗口大小,即允许的最大数据推送数量。 |
Slots | 窗口空闲数量,如果Slots为0,则窗口已经占满。 |
AutoCommit | 是否在数据发出后,自动Commit数据。 |
Tags | 该消费者的tags过滤条件。 说明 当您使用带有tags的watch API时,需要确保同消费者组的消费者使用的tags都是相同的。 |
使用Watch API时,如果执行了数据的tag,则会看到额外的Tags列,例如:
consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]
表示该consumer关注的数据tags,只有当数据满足该条件时,数据才会送达该消费者。
消费者组
消费者组是以相同过滤条件订阅队列服务的消费者集合。同组内的消费者不可以同名,不同组内的消费者可以同名。
在同一消费者组内,数据均衡分发给各个消费者;在不同组间,数据并列推送给每个组的消费者。例如:
同组内消费者会收到不同的数据。
不同组的消费者会收到相同的数据。
如果某个消费者通过API删除数据,该数据会立即删除,其他组内的消费者将无法收到。
您可以通过Attribute API看到队列服务中的消费者状态,示例如下:
groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0
groups.list是消费者列表,各列说明如下:
参数 | 说明 |
Id | 为消费者组的ID。 |
Index | 为当前消费者组正在消费的Index,为所有组内消费者最大的Index。 |
Pending | 指示当前消费者组正在处理,但没有进行Commit的数据数量。 |
Delivered | 以及推送出去的消息数量。 |
Consumers | 消费者组内的消费数量。 |
消费者组数量没有上限,但不会自动清理,创建后状态会一直保留。
消费者与消费者组的使用
您可以在Watch API调用中通过HTTP Header声明所使用的消费者与消费者组,或者在各个语言的SDK中,在client初始化时进行声明。相关HTTP Header的key也可以通过Attributes API进行查看。
meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid
通过X-EAS-QueueService-Uid,X-EAS-QueueService-Gid分别声明使用的消费者ID和加入的消费者组ID。
Commit与Negative
队列服务支持Commit与Negative两种消费方式,操作对象都是数据的Index,但语义不同。
Commit表示该消费者已收到数据并处理完毕,可推送下一批。
Negative表示消费者已经收到数据但无法处理,队列服务根据错误Code决定是否推送下一批。可以在Negative的同时以文本方式声明原因与错误Code,该数据会被推送给其他消费者。下表列出了队列服务能够处理的特殊错误Code:
Code
说明
Shutdown
表明该消费者正在执行退出,队列服务不会继续推送数据。
数据重平衡
在很多场景下,消费者无法进行数据Commit,比如:
在预测服务滚动更新时,部分消费者被终止,导致正在处理的数据无法Commit。
消费者遇到了某些内部错误导致崩溃。
消费者无法处理收到的数据而执行Negative Commit。
这些无法处理的数据会被队列服务重新分发给其他消费者,这种机制称为数据重平衡。数据重平衡会在以下时间点发生:
任一消费者进入Exit状态。
消费者在窗口有空闲的情况下没有收到新的数据推送。
队列服务为每条数据维护投递计数器,每次数据执行重平衡且被分发出去后,计数器加一。当在重平衡过程中,发现某数据的投递计数器已经超过了最大投递次数,该数据被作为死信进行处理。队列服务会执行您配置的死信策略,默认情况下会将数据投递到尾队列。
尾队列
尾队列是用于存放不推送给消费者的数据(如死信或自定义的控制数据)的辅助队列。它是队列服务内的一个队列实例,具有相同的API。每个输入和输出队列都有一个尾队列。
尾队列和普通队列共享最大队列长度。如果最大长度为10,普通队列占用6,则尾队列最多为4。若尾队列已满,再写入会返回队列过长错误。因此,需定期观察和清理尾队列。
在API调用时您可以通过增加以下HTTP Header来声明访问尾队列:
X-EAS-QueueService-Access-Rear: true