队列服务订阅推送

订阅推送可以将就绪的数据立即推送到客户端,降低通过Get API轮询造成的时延和通过队列服务查询带来的队列服务负载。然而,由于涉及较多额外概念,订阅推送具有一定的复杂性。本文将介绍队列服务订阅推送功能的使用。

消费者

消费者是指从队列服务中订阅数据的客户端程序,当客户端使用Watch API进行数据调用时,会在队列服务中生成消费者对象。API中的参数(如Window的大小、Tags),将作为消费者的属性。通过Attribute API查看消费者状态,示例如下:

[OK] Attributes: 
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2

其中:consumers.stats.total表示消费者总数量。consumers.list是消费者列表,各列说明如下:

参数

说明

Id

为消费者的ID全称,格式为<消费者组Id.消费者Id>

Index

为当前消费者正在消费的数据index。

Pending

指示当前消费者正在处理,但没有进行Commit的数据数量。

Status

消费者的状态,主要的状态有:

  • Running:运行中。

  • Exit:长时间退出且有数据未消费。

  • Complete:退出且已完全消费。

  • Leaving:短时间退出。

Window

消费者窗口大小,即允许的最大数据推送数量。

Slots

窗口空闲数量,如果Slots0,则窗口已经占满。

AutoCommit

是否在数据发出后,自动Commit数据。

Tags

该消费者的tags过滤条件。

说明

当您使用带有tagswatch API时,需要确保同消费者组的消费者使用的tags都是相同的。

使用Watch API时,如果执行了数据的tag,则会看到额外的Tags列,例如:

consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]

表示该consumer关注的数据tags,只有当数据满足该条件时,数据才会送达该消费者。

消费者组

消费者组是以相同过滤条件订阅队列服务的消费者集合。同组内的消费者不可以同名,不同组内的消费者可以同名。

在同一消费者组内,数据均衡分发给各个消费者;在不同组间,数据并列推送给每个组的消费者。例如:

  • 同组内消费者会收到不同的数据。

  • 不同组的消费者会收到相同的数据。

重要

如果某个消费者通过API删除数据,该数据会立即删除,其他组内的消费者将无法收到。

您可以通过Attribute API看到队列服务中的消费者状态,示例如下:

groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0

groups.list是消费者列表,各列说明如下:

参数

说明

Id

为消费者组的ID。

Index

为当前消费者组正在消费的Index,为所有组内消费者最大的Index。

Pending

指示当前消费者组正在处理,但没有进行Commit的数据数量。

Delivered

以及推送出去的消息数量。

Consumers

消费者组内的消费数量。

消费者组数量没有上限,但不会自动清理,创建后状态会一直保留。

消费者与消费者组的使用

您可以在Watch API调用中通过HTTP Header声明所使用的消费者与消费者组,或者在各个语言的SDK中,在client初始化时进行声明。相关HTTP Headerkey也可以通过Attributes API进行查看。

meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid

通过X-EAS-QueueService-Uid,X-EAS-QueueService-Gid分别声明使用的消费者ID和加入的消费者组ID。

CommitNegative

队列服务支持CommitNegative两种消费方式,操作对象都是数据的Index,但语义不同。

  • Commit表示该消费者已收到数据并处理完毕,可推送下一批。

  • Negative表示消费者已经收到数据但无法处理,队列服务根据错误Code决定是否推送下一批。可以在Negative的同时以文本方式声明原因与错误Code,该数据会被推送给其他消费者。下表列出了队列服务能够处理的特殊错误Code:

    Code

    说明

    Shutdown

    表明该消费者正在执行退出,队列服务不会继续推送数据。

数据重平衡

在很多场景下,消费者无法进行数据Commit,比如:

  • 在预测服务滚动更新时,部分消费者被终止,导致正在处理的数据无法Commit。

  • 消费者遇到了某些内部错误导致崩溃。

  • 消费者无法处理收到的数据而执行Negative Commit。

这些无法处理的数据会被队列服务重新分发给其他消费者,这种机制称为数据重平衡。数据重平衡会在以下时间点发生:

  • 任一消费者进入Exit状态。

  • 消费者在窗口有空闲的情况下没有收到新的数据推送。

队列服务为每条数据维护投递计数器,每次数据执行重平衡且被分发出去后,计数器加一。当在重平衡过程中,发现某数据的投递计数器已经超过了最大投递次数,该数据被作为死信进行处理。队列服务会执行您配置的死信策略,默认情况下会将数据投递到尾队列。

尾队列

尾队列是用于存放不推送给消费者的数据(如死信或自定义的控制数据)的辅助队列。它是队列服务内的一个队列实例,具有相同的API。每个输入和输出队列都有一个尾队列。

重要

尾队列和普通队列共享最大队列长度。如果最大长度为10,普通队列占用6,则尾队列最多为4。若尾队列已满,再写入会返回队列过长错误。因此,需定期观察和清理尾队列。

API调用时您可以通过增加以下HTTP Header来声明访问尾队列:

X-EAS-QueueService-Access-Rear: true