文档

消费托管MQ接口

更新时间:

消费托管MQ接口主要用于获取托管MQ中的消息以及消息成功后进行确认的功能。

获取消息

获取托管MQ中的消息,方法名称:consumeMQMessage

  • 函数原型
    ConsumeMQMessageResponse consumeMQMessage(final ConsumeMQMessageRequest request)
  • 请求参数
    参数必选说明
    numOfMessages一次最多消费消息的条数,取值范围:1~16。
    streamId队列ID,即数据导出服务中的任务ID。
    waitSeconds长轮询时间,不填则为短轮询,取值范围:1~20,单位:秒。
  • 返回参数
    参数类型说明
    resultList<MQMessage>获取到的队列消息。
  • 结构体说明
    结构体参数说明
    MQMessagemessage_body收到消息数据。
    receipt_handle消息确认句柄,处理成功后需要通过 ackMQMessage接口进行消息确认。
  • 示例
    ConsumeMQMessageRequest request = new ConsumeMQMessageRequest();
    request.setReq_numOfMessages(5);            //一次最多消费消息的条数,
    request.setStreamId(streamId);              //队列ID,即数据导出服务中的任务ID
    ConsumeMQMessageResponse response = restClient.consumeMQMessage(request);

确认消息

消费托管MQ消息成功后,进行确认,方法名称:ackMQMessage

  • 函数原型
    AckMQMessageResponse ackMQMessage(final AckMQMessageRequest request)
  • 请求参数
    参数必选说明
    streamId队列ID,即数据导出服务中的任务ID。
    mqAckRequests需要确认的消息列表,数据为List<MQAckRequest>
  • 结构体说明。
    结构体参数说明
    MQAckRequestreceipt_handle需要确认的消息句柄。
    state确定的消息状态。消费成功,状态为 success;消费失败状态为 failed
  • 示例
    AckMQMessageRequest ackMQMessageRequest = new AckMQMessageRequest();
    ackMQMessageRequest.setStreamId(streamId);
    ackMQMessageRequest.setMqAckRequests(new ArrayList<MQAckRequest>(response.getResult().size()));
    for (MQMessage msg : response.getResult()) {
      MQAckRequest ackRequest = new MQAckRequest();
      ackRequest.setReceipt_handle(msg.getReceipt_handle());
      ackRequest.setState(worker(msg) ? "success" : "failed");
      ackMQMessageRequest.getMqAckRequests().add(ackRequest);
    }
    if (ackMQMessageRequest.getMqAckRequests().size() > 0) {
      // 确认消息的处理结果,失败的消息会在一定时间后重新被 consumeMQMessage 消费到
      AckMQMessageResponse ackMQMessageResponse = restClient.ackMQMessage(ackMQMessageRequest);
    }