消费托管MQ接口主要用于获取托管MQ中的消息以及消息成功后进行确认的功能。
获取消息
获取托管MQ中的消息,方法名称:consumeMQMessage。
- 函数原型
ConsumeMQMessageResponse consumeMQMessage(final ConsumeMQMessageRequest request)
- 请求参数
参数 必选 说明 numOfMessages 是 一次最多消费消息的条数,取值范围:1~16。 streamId 是 队列ID,即数据导出服务中的任务ID。 waitSeconds 否 长轮询时间,不填则为短轮询,取值范围:1~20,单位:秒。 - 返回参数
参数 类型 说明 result List<MQMessage>
获取到的队列消息。 - 结构体说明
结构体 参数 说明 MQMessage message_body 收到消息数据。 receipt_handle 消息确认句柄,处理成功后需要通过 ackMQMessage
接口进行消息确认。 - 示例
ConsumeMQMessageRequest request = new ConsumeMQMessageRequest(); request.setReq_numOfMessages(5); //一次最多消费消息的条数, request.setStreamId(streamId); //队列ID,即数据导出服务中的任务ID ConsumeMQMessageResponse response = restClient.consumeMQMessage(request);
确认消息
消费托管MQ消息成功后,进行确认,方法名称:ackMQMessage。
- 函数原型
AckMQMessageResponse ackMQMessage(final AckMQMessageRequest request)
- 请求参数
参数 必选 说明 streamId 是 队列ID,即数据导出服务中的任务ID。 mqAckRequests 是 需要确认的消息列表,数据为 List<MQAckRequest>
。 - 结构体说明。
结构体 参数 说明 MQAckRequest receipt_handle 需要确认的消息句柄。 state 确定的消息状态。消费成功,状态为 success
;消费失败状态为failed
。 - 示例
AckMQMessageRequest ackMQMessageRequest = new AckMQMessageRequest(); ackMQMessageRequest.setStreamId(streamId); ackMQMessageRequest.setMqAckRequests(new ArrayList<MQAckRequest>(response.getResult().size())); for (MQMessage msg : response.getResult()) { MQAckRequest ackRequest = new MQAckRequest(); ackRequest.setReceipt_handle(msg.getReceipt_handle()); ackRequest.setState(worker(msg) ? "success" : "failed"); ackMQMessageRequest.getMqAckRequests().add(ackRequest); } if (ackMQMessageRequest.getMqAckRequests().size() > 0) { // 确认消息的处理结果,失败的消息会在一定时间后重新被 consumeMQMessage 消费到 AckMQMessageResponse ackMQMessageResponse = restClient.ackMQMessage(ackMQMessageRequest); }
文档内容是否对您有帮助?