轻量主题模型

本文介绍云消息队列 RocketMQ 版中轻量主题(LiteTopic)的定义、模型关系、内部属性、行为约束、版本兼容性及使用建议。

前提条件

  • 轻量主题目前只有非Serverless系列(包年包月、按量付费)和Serverless系列的独享实例支持。

  • 提交工单申请LiteTopic模型功能白名单,提单时需要提供购买实例的主账号uid和实例所属地域。

定义

轻量主题是云消息队列 RocketMQ 版中消息传输和存储的二级容器,用于标识同一类业务逻辑下不同子类(例如不同会话、任务等粒度)的消息。

轻量主题的作用主要如下:

  • 实现排他性消费,定义数据的二级隔离

建议将不同子类的数据拆分到不同的轻量主题中管理,通过轻量主题实现更细致的存储隔离性和订阅隔离性。

  • 定义数据的身份和权限

在基于主题的身份识别和权限管理基础上,可以通过轻量主题,进一步细分用户的身份与权限。

模型关系

在整个云消息队列 RocketMQ 版的领域模型中,轻量主题所处的流程和位置如下:

image.png

  • 主题(Topic)是云消息队列 RocketMQ 版中消息传输和存储的顶层容器。当类型为Lite类型时,Topic下可创建轻量主题(LiteTopic),由TopicLiteTopic共同唯一确认消息的存储容器。

  • 当类型为Lite类型时,每个存储容器默认由一个队列组成。

内部属性

轻量主题名称

  • 定义:轻量主题的名称,用于标识轻量主题,轻量主题名称在所属主题内全局唯一。

  • 取值:当主题类型为Lite时,用户对message进行了setLiteTopic,如对应的轻量主题不存在,系统会自动创建。

  • 约束:请参见参数限制

过期时间

  • 定义:轻量主题的过期时间,当轻量主题距离最近一次消息写入时间超过过期时间后,该轻量主题会被自动删除。删除是指释放轻量主题占用的个数,即占用总数-1。

  • 取值:当创建主题类型为Lite时,可以设置过期时间 expiration 值。

  • 约束:请参见参数限制

版本兼容性

  • 服务端版本:5.0-rmq-20251024-1 版本及以上

  • 客户端版本:RocketMQ gRPC 5.1.0 版本及以上

轻量类型和普通类型主题的差异

场景

对比项

轻量类型主题

普通类型主题

消息存储

一级主题

相同。都需要预先创建主题资源。

二级主题

可以在Topic下创建百万量级的二级主题资源LiteTopic,该二级资源有许多新特性。

无二级主题资源。

自动化生命周期管理

二级主题LiteTopic的生命周期可以自动化管理:

  • 自动创建:发送或订阅的时候,如果二级主题LiteTopic不存在,则自动创建该资源。

  • 自动删除:设置expiration时间,持续无新消息发送后会自动删除。

顺序性

每个LiteTopic只创建一个队列,同一个队列中的消息存储是顺序的。

  • 每个LiteTopic

会创建多个队列,只有分区顺序Topic

收发并发TPS上限

由于只有一个队列,每个LiteTopic 的TPS上限是有限的。

Topic下可以创建百万量级的LiteTopic,总TPS的上限是可以根据LiteTopic的增加而增加。

TopicTPS可以根据队列数量和集群机器节点数量横向扩容。

消息消费

订阅关系一致性

可以不一致。

同一Group下,每个消费者可订阅不同的LiteTopic集合,Group的限制弱化。

需要一致。

同一Group下,每个消费者的订阅关系需要保持一致,共享目标主题下的消息。

顺序性

顺序消费,一个LiteTopic下的消息只能被一个消费者线程处理。

可选择并发消费或顺序消费。

动态订阅

每个消费者可以动态增加或删除指定某个LiteTopic的订阅

单个消费者可以订阅的LiteTopic的数量

每个消费者可以订阅千量级的LiteTopic

可观测

Metrics指标数据

有消息堆积量指标

无消息处理滞后时间指标

有消息堆积量指标

有消息处理之后时间指标

消息轨迹

相同

常见轻量主题应用场景

应用场景 1:Multi-Agent 的异步通信,解决长耗时调用阻塞痛点

随着AI需求场景变得更加复杂,大多数单Agent在复杂场景中面临着局限性:缺乏专业化分工、难以对多领域进行整合;无法实现动态协作决策。单Agent应用和单Agent工作流会逐步转向 Multi-Agent 应用。但因为 AI 任务长耗时的特点,同步调用会造成调用者的线程阻塞,存在大规模协作扩展性问题。

image.png

如上图所示 Multi-Agent的工作流程为:Supervisor Agent 负责将需求拆分给两个子Agent;两个子Agent负责各自领域问题解答并将结果返回给Supervisor Agent; Supervisor Agent将结果汇总返回给Web端。使用 RocketMQ 异步通信方案流程如下:

  1. 在接收请求的流程中

    1. 为每个子Agent创建一个Topic (Request)用于请求任务的缓冲队列,可以是优先级Topic,能先处理高优任务。

    2. Supervisor Agent 将拆分任务信息发送到对应的请求主题中。

  2. 在返回响应结果流程中

    1. Supervisor Agent 创建 Lite 类型的Topic (Response) 并订阅这个Topic。

    2. 子 Agent 处理将每个任务的响应结果发送到Topic (Response) 的LiteTopic中。LiteTopic可以用任务ID命名,为每个任务创建一个专属的LiteTopic。

    3. Supervisor Agent 通过订阅实时获取结果,然后通过HTTP SSE协议推送给Web端。

应用场景 2:分布式会话状态管理,终结 AI 应用的会话状态管理难题

AI 应用的交互模式具有特殊性,即长耗时、多轮次且高度依赖高成本计算的会话。当应用依赖 SSE 等长连接时,一旦连接中断(如网关重启、连接超时、网络不稳定触发),不仅会导致当前会话上下文的丢失,更会直接造成已投入的 AI 任务作废,从而浪费宝贵的算力资源。

image.png

如上图所示,和场景 1的返回响应结果流程相同,使用Lite 类型的Topic作为实时通知响应结果。这里每个LiteTopic可以使用 SessionID 命名(例如 chatbot/{sessionID}),这样会话结果都作为消息在这个主题中有序传递。长连接重连后继续保持会话的连续性的解决方案如下:

  1. Web2 和应用服务节点 1 建立长链接,创建会话Session2。

  2. 应用服务节点 1 监听 LiteTopic [chat/SessionID2]

  3. 大模型任务调度组件,根据请求的SessionID信息,将返回结果发送到 LiteTopic [chat/SessionID2]

  4. 由于网络等异常,WebSocket 自动重连到了应用服务节点2。

  5. 应用服务节点 1 取消订阅LiteTopic [chat/SessionID2],应用服务节点 2 订阅LiteTopic [chat/SessionID2]。

  6. LiteTopic [chat/SessionID2] 根据之前的消费进度,将后续未消费的消息继续推送给应用服务节点 2。保证了会话状态和会话数据的连续性。

示例代码

完整示例请查看RocketMQ 5.x gRPC SDK中的示例代码。

发送消息

Producer producer = provider.newProducerBuilder()
    .setTopics(topic)
    .setClientConfiguration(clientConfiguration)
    .build();

final Message message = provider.newMessageBuilder()
    .setTopic(topic)
    //设置消息索引键,可根据关键字精确查找某条消息。
    .setKeys("messageKey")
    //设置LiteTopic
    .setLiteTopic("lite-topic-1")
    //消息体
    .setBody("messageBody".getBytes())
    .build();

try {
    final SendReceipt sendReceipt = producer.send(message);
    log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (LiteTopicQuotaExceededException e) {
    // LiteTopic配额超限,评估并增加配额
    log.error("Lite topic quota exceeded", e);
} catch (Throwable t) {
    log.error("Failed to send message", t);
}

消费消息

需要使用 LitePushConsumer 消费类:

//初始化LitePushConsumer,需要绑定消费者分组ConsumerGroup、目标主题Topic、通信参数等。
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // 控制台创建ConsumerGroup时绑定的Topic
    .bindTopic(topicName)
    //设置消费者分组
    .setConsumerGroup(consumerGroup)
    .setMessageListener(messageView -> {
        //处理消息并返回消费结果。
        LOGGER.info("Consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();

try {
    // 订阅感兴趣的LiteTopic集合
    litePushConsumer.subscribeLite("lite-topic-1");
    litePushConsumer.subscribeLite("lite-topic-2");
    litePushConsumer.subscribeLite("lite-topic-3");
} catch (LiteSubscriptionQuotaExceededException e) {
    // LiteTopic配额超限,评估并增加配额
    log.error("Lite subscription quota exceeded", e);
} catch (Throwable t) {
    log.error("Failed to subscribe lite topic", t);
}

// 业务处理完毕后,及时取消不再使用的LiteTopic订阅
litePushConsumer.unsubscribeLite("lite-topic-3");

// 获取当前订阅的LiteTopic集合
Set<String> liteTopicSet = litePushConsumer.getLiteTopicSet();

动态修改订阅关系

/**
 * 动态添加订阅关系
 * subscribeLite() 方法会发起网络请求并执行配额验证,因此可能会调用失败。
 * 请务必检查此调用的结果,以确保订阅被成功添加。
 * 可能的失败场景包括:
 * 1. 网络请求错误,可以重试。
 * 2. 配额验证失败,抛出 LiteSubscriptionQuotaExceededException 异常。
 * 请评估配额是否满足需求,并及时使用 unsubscribeLite() 取消订阅不再使用的主题以释放资源。
 */

litePushConsumer.subscribeLite("lite-topic-1");

//动态删除订阅关系
litePushConsumer.unsubscribeLite("lite-topic-1");

使用限制

  1. 单个消费者能订阅的LiteTopic数量上限为2000个(可提工单调整)。

  2. 单个LiteTopic的消费TPS为上限200TPS。

  3. 为保障服务稳定性,单个实例对可创建或可订阅的 LiteTopic 数量设置了上限。具体配额(可提工单调整)请参见下表。

    1. LiteTopic 创建数量

      • 定义:指单个实例在其生命周期内,当前已创建并存在的 LiteTopic 总量。

      • 触发条件与影响:当此数量达到上限时,若客户端尝试向一个尚未存在的 LiteTopic 发送消息(该操作会触发自动创建),系统将无法创建该 LiteTopic,并返回消息发送失败的错误。

    2. LiteTopic 订阅数量

      • 定义:指实例下所有在线的消费者客户端与 LiteTopic 建立的有效订阅关系的总和。这是一个动态变化的数值。

      • 影响:当此数量达到上限时,任何消费者客户端尝试订阅一个新的 LiteTopic 都会失败,无法建立新的消费关系。

      • 特殊规则:请注意,即使一个 LiteTopic 已从系统中删除,但若仍有消费者客户端保持对其的订阅关系,该订阅关系依然会被计入订阅总数,直至消费者停止订阅。

Serverless系列实例

部署架构

容量模式

规格

可创建或订阅LiteTopic的最大数量

独享

预留+弹性

5000

30

10000

60

15000

72

[2万, 5万]

100

(5万, 10万]

150

(10万, 20万]

240

(20万, 30万]

470

(30万, 50万]

630

(50万, 100万]

1160

Serverless系列(包年包月、按量付费)实例

标准版

实例规格

消息收发基础规格TPS上限(次/秒)

可创建或订阅LiteTopic的最大数量

rmq.s2.2xlarge

2000

15

rmq.s2.4xlarge

4000

25

rmq.s2.6xlarge

6000

30

专业版

实例规格

消息收发基础规格TPS上限(次/秒)

可创建或订阅LiteTopic的最大数量

rmq.p2.2xlarge

2000

15

rmq.p2.4xlarge

4000

25

rmq.p2.6xlarge

6000

30

rmq.p2.10xlarge

10000

60

rmq.p2.20xlarge

20000

80

rmq.p2.30xlarge

30000

100

rmq.p2.40xlarge

40000

120w

rmq.p2.50xlarge

50000

140w

rmq.p2.100xlarge

100000

220w

rmq.p2.120xlarge

120000

270w

rmq.p2.150xlarge

150000

330w

rmq.p2.200xlarge

200000

450w

铂金版

实例规格

消息收发基础规格TPS上限(次/秒)

可创建或订阅LiteTopic的最大数量

rmq.u2.10xlarge

10000

60

rmq.u2.20xlarge

20000

80

rmq.u2.30xlarge

30000

100

rmq.u2.40xlarge

40000

120

rmq.u2.50xlarge

50000

140

rmq.u2.60xlarge

60000

160

rmq.u2.70xlarge

70000

170

rmq.u2.80xlarge

80000

180

rmq.u2.90xlarge

90000

200

rmq.u2.100xlarge

100000

220

rmq.u2.120xlarge

120000

270

rmq.u2.150xlarge

150000

330

rmq.u2.200xlarge

200000

450

rmq.u2.250xlarge

250000

560

rmq.u2.300xlarge

300000

630

rmq.u2.350xlarge

350000

750

rmq.u2.400xlarge

400000

930

rmq.u2.450xlarge

450000

1040

rmq.u2.500xlarge

500000

1160

rmq.u2.550xlarge

550000

1280

rmq.u2.600xlarge

600000

1400

rmq.u2.1000xlarge

1000000

2320