消息队列 RocketMQ 版所提供的 TCP Java SDK 支持多线程消费,且适用于所有消息类型,本文介绍如何设置消费线程数的方法。

在启动 Consumer 时,设置一个 ConsumeThreadNums 属性即可。具体示例如下所示。

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_001");
    properties.put(PropertyKeyConst.AccessKey, "xxxxxxxxxxxx");
    properties.put(PropertyKeyConst.SecretKey, "xxxxxxxxxxxx");
    /**
     * 设置消费端线程数固定为 20
     */
    properties.setProperty(PropertyKeyConst.ConsumeThreadNums,"20");
    Consumer consumer =ONSFactory.createConsumer(properties);
    consumer.subscribe("TestTopic", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println("Receive: " + message);
            return Action.CommitMessage;
        }
    });
    consumer.start();
    System.out.println("Consumer Started");
}