本文介绍云消息队列 RocketMQ 版中轻量主题(LiteTopic)的定义、模型关系、内部属性、行为约束、版本兼容性及使用建议。
前提条件
轻量主题目前只有非Serverless系列(包年包月、按量付费)和Serverless系列的独享实例支持。
提交工单申请LiteTopic模型功能白名单,提单时需要提供购买实例的主账号uid和实例所属地域。
定义
轻量主题是云消息队列 RocketMQ 版中消息传输和存储的二级容器,用于标识同一类业务逻辑下不同子类(例如不同会话、任务等粒度)的消息。
轻量主题的作用主要如下:
实现排他性消费,定义数据的二级隔离
建议将不同子类的数据拆分到不同的轻量主题中管理,通过轻量主题实现更细致的存储隔离性和订阅隔离性。
定义数据的身份和权限
在基于主题的身份识别和权限管理基础上,可以通过轻量主题,进一步细分用户的身份与权限。
模型关系
在整个云消息队列 RocketMQ 版的领域模型中,轻量主题所处的流程和位置如下:

主题(Topic)是云消息队列 RocketMQ 版中消息传输和存储的顶层容器。当类型为Lite类型时,Topic下可创建轻量主题(LiteTopic),由Topic和LiteTopic共同唯一确认消息的存储容器。
当类型为Lite类型时,每个存储容器默认由一个队列组成。
内部属性
轻量主题名称
定义:轻量主题的名称,用于标识轻量主题,轻量主题名称在所属主题内全局唯一。
取值:当主题类型为Lite时,用户对message进行了setLiteTopic,如对应的轻量主题不存在,系统会自动创建。
约束:请参见参数限制。
过期时间
定义:轻量主题的过期时间,当轻量主题距离最近一次消息写入时间超过过期时间后,该轻量主题会被自动删除。删除是指释放轻量主题占用的个数,即占用总数-1。
取值:当创建主题类型为Lite时,可以设置过期时间 expiration 值。
约束:请参见参数限制。
版本兼容性
服务端版本:5.0-rmq-20251024-1 版本及以上
客户端版本:RocketMQ gRPC 5.1.0 版本及以上
轻量类型和普通类型主题的差异
场景 | 对比项 | 轻量类型主题 | 普通类型主题 |
消息存储 | 一级主题 | 相同。都需要预先创建主题资源。 | |
二级主题 | 可以在Topic下创建百万量级的二级主题资源LiteTopic,该二级资源有许多新特性。 | 无二级主题资源。 | |
自动化生命周期管理 | 二级主题LiteTopic的生命周期可以自动化管理:
| 无 | |
顺序性 | 每个LiteTopic只创建一个队列,同一个队列中的消息存储是顺序的。
| 会创建多个队列,只有分区顺序Topic | |
收发并发TPS上限 | 由于只有一个队列,每个LiteTopic 的TPS上限是有限的。 但Topic下可以创建百万量级的LiteTopic,总TPS的上限是可以根据LiteTopic的增加而增加。 | Topic的TPS可以根据队列数量和集群机器节点数量横向扩容。 | |
消息消费 | 订阅关系一致性 | 可以不一致。 同一Group下,每个消费者可订阅不同的LiteTopic集合,Group的限制弱化。 | 需要一致。 同一Group下,每个消费者的订阅关系需要保持一致,共享目标主题下的消息。 |
顺序性 | 顺序消费,一个LiteTopic下的消息只能被一个消费者线程处理。 | 可选择并发消费或顺序消费。 | |
动态订阅 | 每个消费者可以动态增加或删除指定某个LiteTopic的订阅 | 无 | |
单个消费者可以订阅的LiteTopic的数量 | 每个消费者可以订阅千量级的LiteTopic | 无 | |
可观测 | Metrics指标数据 | 有消息堆积量指标 无消息处理滞后时间指标 | 有消息堆积量指标 有消息处理之后时间指标 |
消息轨迹 | 相同 | ||
常见轻量主题应用场景
应用场景 1:Multi-Agent 的异步通信,解决长耗时调用阻塞痛点
随着AI需求场景变得更加复杂,大多数单Agent在复杂场景中面临着局限性:缺乏专业化分工、难以对多领域进行整合;无法实现动态协作决策。单Agent应用和单Agent工作流会逐步转向 Multi-Agent 应用。但因为 AI 任务长耗时的特点,同步调用会造成调用者的线程阻塞,存在大规模协作扩展性问题。

如上图所示 Multi-Agent的工作流程为:Supervisor Agent 负责将需求拆分给两个子Agent;两个子Agent负责各自领域问题解答并将结果返回给Supervisor Agent; Supervisor Agent将结果汇总返回给Web端。使用 RocketMQ 异步通信方案流程如下:
在接收请求的流程中
为每个子Agent创建一个Topic (Request)用于请求任务的缓冲队列,可以是优先级Topic,能先处理高优任务。
Supervisor Agent 将拆分任务信息发送到对应的请求主题中。
在返回响应结果流程中
Supervisor Agent 创建 Lite 类型的Topic (Response) 并订阅这个Topic。
子 Agent 处理将每个任务的响应结果发送到Topic (Response) 的LiteTopic中。LiteTopic可以用任务ID命名,为每个任务创建一个专属的LiteTopic。
Supervisor Agent 通过订阅实时获取结果,然后通过HTTP SSE协议推送给Web端。
应用场景 2:分布式会话状态管理,终结 AI 应用的会话状态管理难题
AI 应用的交互模式具有特殊性,即长耗时、多轮次且高度依赖高成本计算的会话。当应用依赖 SSE 等长连接时,一旦连接中断(如网关重启、连接超时、网络不稳定触发),不仅会导致当前会话上下文的丢失,更会直接造成已投入的 AI 任务作废,从而浪费宝贵的算力资源。

如上图所示,和场景 1的返回响应结果流程相同,使用Lite 类型的Topic作为实时通知响应结果。这里每个LiteTopic可以使用 SessionID 命名(例如 chatbot/{sessionID}),这样会话结果都作为消息在这个主题中有序传递。长连接重连后继续保持会话的连续性的解决方案如下:
Web端2 和应用服务节点 1 建立长链接,创建会话Session2。
应用服务节点 1 监听 LiteTopic [chat/SessionID2]
大模型任务调度组件,根据请求的SessionID信息,将返回结果发送到 LiteTopic [chat/SessionID2]
由于网络等异常,WebSocket 自动重连到了应用服务节点2。
应用服务节点 1 取消订阅LiteTopic [chat/SessionID2],应用服务节点 2 订阅LiteTopic [chat/SessionID2]。
LiteTopic [chat/SessionID2] 根据之前的消费进度,将后续未消费的消息继续推送给应用服务节点 2。保证了会话状态和会话数据的连续性。
示例代码
完整示例请查看RocketMQ 5.x gRPC SDK中的示例代码。
发送消息
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
final Message message = provider.newMessageBuilder()
.setTopic(topic)
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置LiteTopic
.setLiteTopic("lite-topic-1")
//消息体
.setBody("messageBody".getBytes())
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (LiteTopicQuotaExceededException e) {
// LiteTopic配额超限,评估并增加配额
log.error("Lite topic quota exceeded", e);
} catch (Throwable t) {
log.error("Failed to send message", t);
}
消费消息
需要使用 LitePushConsumer 消费类:
//初始化LitePushConsumer,需要绑定消费者分组ConsumerGroup、目标主题Topic、通信参数等。
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 控制台创建ConsumerGroup时绑定的Topic
.bindTopic(topicName)
//设置消费者分组
.setConsumerGroup(consumerGroup)
.setMessageListener(messageView -> {
//处理消息并返回消费结果。
LOGGER.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
try {
// 订阅感兴趣的LiteTopic集合
litePushConsumer.subscribeLite("lite-topic-1");
litePushConsumer.subscribeLite("lite-topic-2");
litePushConsumer.subscribeLite("lite-topic-3");
} catch (LiteSubscriptionQuotaExceededException e) {
// LiteTopic配额超限,评估并增加配额
log.error("Lite subscription quota exceeded", e);
} catch (Throwable t) {
log.error("Failed to subscribe lite topic", t);
}
// 业务处理完毕后,及时取消不再使用的LiteTopic订阅
litePushConsumer.unsubscribeLite("lite-topic-3");
// 获取当前订阅的LiteTopic集合
Set<String> liteTopicSet = litePushConsumer.getLiteTopicSet();
动态修改订阅关系
/**
* 动态添加订阅关系
* subscribeLite() 方法会发起网络请求并执行配额验证,因此可能会调用失败。
* 请务必检查此调用的结果,以确保订阅被成功添加。
* 可能的失败场景包括:
* 1. 网络请求错误,可以重试。
* 2. 配额验证失败,抛出 LiteSubscriptionQuotaExceededException 异常。
* 请评估配额是否满足需求,并及时使用 unsubscribeLite() 取消订阅不再使用的主题以释放资源。
*/
litePushConsumer.subscribeLite("lite-topic-1");
//动态删除订阅关系
litePushConsumer.unsubscribeLite("lite-topic-1");
使用限制
单个消费者能订阅的LiteTopic数量上限为2000个(可提工单调整)。
单个LiteTopic的消费TPS为上限200TPS。
为保障服务稳定性,单个实例对可创建或可订阅的 LiteTopic 数量设置了上限。具体配额(可提工单调整)请参见下表。
LiteTopic 创建数量
定义:指单个实例在其生命周期内,当前已创建并存在的 LiteTopic 总量。
触发条件与影响:当此数量达到上限时,若客户端尝试向一个尚未存在的 LiteTopic 发送消息(该操作会触发自动创建),系统将无法创建该 LiteTopic,并返回消息发送失败的错误。
LiteTopic 订阅数量
定义:指实例下所有在线的消费者客户端与 LiteTopic 建立的有效订阅关系的总和。这是一个动态变化的数值。
影响:当此数量达到上限时,任何消费者客户端尝试订阅一个新的 LiteTopic 都会失败,无法建立新的消费关系。
特殊规则:请注意,即使一个 LiteTopic 已从系统中删除,但若仍有消费者客户端保持对其的订阅关系,该订阅关系依然会被计入订阅总数,直至消费者停止订阅。
Serverless系列实例
部署架构 | 容量模式 | 规格 | 可创建或订阅LiteTopic的最大数量 |
独享 | 预留+弹性 | 5000 | 30万 |
10000 | 60万 | ||
15000 | 72万 | ||
[2万, 5万] | 100万 | ||
(5万, 10万] | 150万 | ||
(10万, 20万] | 240万 | ||
(20万, 30万] | 470万 | ||
(30万, 50万] | 630万 | ||
(50万, 100万] | 1160万 |
非Serverless系列(包年包月、按量付费)实例
标准版
实例规格 | 消息收发基础规格TPS上限(次/秒) | 可创建或订阅LiteTopic的最大数量 |
rmq.s2.2xlarge | 2000 | 15万 |
rmq.s2.4xlarge | 4000 | 25万 |
rmq.s2.6xlarge | 6000 | 30万 |
专业版
实例规格 | 消息收发基础规格TPS上限(次/秒) | 可创建或订阅LiteTopic的最大数量 |
rmq.p2.2xlarge | 2000 | 15万 |
rmq.p2.4xlarge | 4000 | 25万 |
rmq.p2.6xlarge | 6000 | 30万 |
rmq.p2.10xlarge | 10000 | 60万 |
rmq.p2.20xlarge | 20000 | 80万 |
rmq.p2.30xlarge | 30000 | 100万 |
rmq.p2.40xlarge | 40000 | 120w |
rmq.p2.50xlarge | 50000 | 140w |
rmq.p2.100xlarge | 100000 | 220w |
rmq.p2.120xlarge | 120000 | 270w |
rmq.p2.150xlarge | 150000 | 330w |
rmq.p2.200xlarge | 200000 | 450w |
铂金版
实例规格 | 消息收发基础规格TPS上限(次/秒) | 可创建或订阅LiteTopic的最大数量 |
rmq.u2.10xlarge | 10000 | 60万 |
rmq.u2.20xlarge | 20000 | 80万 |
rmq.u2.30xlarge | 30000 | 100万 |
rmq.u2.40xlarge | 40000 | 120万 |
rmq.u2.50xlarge | 50000 | 140万 |
rmq.u2.60xlarge | 60000 | 160万 |
rmq.u2.70xlarge | 70000 | 170万 |
rmq.u2.80xlarge | 80000 | 180万 |
rmq.u2.90xlarge | 90000 | 200万 |
rmq.u2.100xlarge | 100000 | 220万 |
rmq.u2.120xlarge | 120000 | 270万 |
rmq.u2.150xlarge | 150000 | 330万 |
rmq.u2.200xlarge | 200000 | 450万 |
rmq.u2.250xlarge | 250000 | 560万 |
rmq.u2.300xlarge | 300000 | 630万 |
rmq.u2.350xlarge | 350000 | 750万 |
rmq.u2.400xlarge | 400000 | 930万 |
rmq.u2.450xlarge | 450000 | 1040万 |
rmq.u2.500xlarge | 500000 | 1160万 |
rmq.u2.550xlarge | 550000 | 1280万 |
rmq.u2.600xlarge | 600000 | 1400万 |
rmq.u2.1000xlarge | 1000000 | 2320万 |