全部产品
云市场
云游戏

获取工作流队列消息

更新时间:2017-11-13 21:03:33

1. 问题场景

MTS通过消息服务(MNS)将工作流开始和结束的状态信息回调给客户,客户可以在应用端获取工作流执行状态进行业务逻辑处理。消息模式包括队列模式和通知模式,通知模式建议参考【工作流通知消息实例】,本文主要讲解获取队列模式的工作流消息。

2. 解决方案

2.1 MNS配置消息队列

首先用户需要在消息服务配置队列以接收工作流发出的消息。具体操作流程如下:消息服务->队列->选择数据中心->创建队列,界面如下图。

1

新建队列必填的参数包括队列名称和当前地域,其他参数均为可选值,这些参数的意义如下:

  • 消息接收长轮询等待时间:表示轮询等待的时间,也就是针对该队列的所有ReceiveMessage请求在Queue无消息时,都将默认进入到Polling等待状态,在等待期间一直保持无消息,则会返回MessageNotExist;如果在此期间有新的消息进入到Queue中,则会唤醒相应的ReceiveMessage请求进行返回。这里默认值为0秒,即关闭长轮询。

  • 取出消息隐藏时长:表示消息从队列中取出后保持隐藏状态的时间。消息从队列中取出后会被从可取状态(Active)变成隐藏状态(Inactive)后,这个时间一到,消息会从隐藏恢复成Active可取状态。这里默认值为30秒。

  • 消息最大长度:限定允许发送到该队列的消息体的最大长度,默认值为64KB,建议这里不要设置太小,因为工作流发送消息体内容较多,如果设置较小可能会导致无法接收到消息。

  • 消息存活时间: 表示消息在本队列中最长的存活时间,从发送到该队列开始经过此参数指定的时间后,不论消息是否被取出过都将被删除,目前默认值为4天。

  • 消息延时:表示消息可供消费的延迟时间,发送消息到本队列的所有消息默认将以本参数指定的秒数延后可被消费,默认值为0秒。

  • 开启logging:表示将该队列中的日志推送到OSS或者SLS中。这里需要在日志管理中配置,配置方法请参考【推送日志到OSS】和【推送日志到LogService】,默认为不开启状态。

2.2 工作流配置消息模式

用户接下来需要将工作流与消息服务中的该队列相关联,在编辑工作流的输入节点时即可设置消息队列,如下图。用户设置消息类别为队列,队列名称选择之前我们设置的名称即关联完成。

2

2.3 MNS队列接收示例代码

上述流程配置完成后即执行工作流并通过MNS队列获取其中的消息。获取MNS队列中的消息可以通过MNS的API/SDK方式获取。这里以Java SDK的使用作为示例进行演示。

首先需要获取MNS相关信息,主要包括:AccessKeyId、AccessKeySecret、MNSEndpoint和队列名称。AccessKeyId和AccessKeySecret获取方法请登陆阿里云管理控制台,在下图中的按钮中获取。

3

MNS队列的名称即是我们前述创建的队列的名称,可以在MNS控制台中的队列列表中查看。而MNSEndpoint表示连接该队列的地址,可以通过MNS控制台中的“获取Endpoint”按钮获取得到,如下图中可以查看到每个账号在每个数据中心对应三个MNSEndpoint,分别是公网地址、私网地址和VPC地址。公网地址即是在有公网IP的服务器上均可使用,私网地址是指同一数据中心的阿里云经典网络的云服务器ECS上连接MNS可以使用的地址,而VPC地址即是同一数据中心的阿里云VPC网络的云服务器ECS的内网连接地址。

4

接下来即是构建Java测试环境,这里我们采用maven方式构建project。下面即是我们添加的依赖项和示例代码。

  1. <dependency>
  2. <groupId>com.aliyun.mns</groupId>
  3. <artifactId>aliyun-sdk-mns</artifactId>
  4. <version>1.1.5</version>
  5. </dependency>
  1. public class ConsumerDemo {
  2. public static void main(String[] args) {
  3. CloudAccount account = new CloudAccount("YourAccessId", "YourAccessKey", "MNSEndpoint");
  4. MNSClient client = account.getMNSClient(); // 在程序中,CloudAccount以及MNSClient单例实现即可,多线程安全
  5. try{
  6. CloudQueue queue = client.getQueueRef("TestQueue");
  7. Message popMsg = queue.popMessage();
  8. if (popMsg != null){
  9. System.out.println("message handle: " + popMsg.getReceiptHandle());
  10. System.out.println("message body: " + popMsg.getMessageBodyAsString());
  11. System.out.println("message id: " + popMsg.getMessageId());
  12. System.out.println("message dequeue count:" + popMsg.getDequeueCount());
  13. //删除已经取出消费的消息
  14. queue.deleteMessage(popMsg.getReceiptHandle());
  15. System.out.println("delete message successfully.\n");
  16. }
  17. else{
  18. System.out.println("message not exist in TestQueue.\n");
  19. }
  20. } catch (ClientException ce)
  21. {
  22. System.out.println("Something wrong with the network connection between client and MNS service."
  23. + "Please check your network and DNS availablity.");
  24. ce.printStackTrace();
  25. } catch (ServiceException se)
  26. {
  27. se.printStackTrace();
  28. logger.error("MNS exception requestId:" + se.getRequestId(), se);
  29. if (se.getErrorCode() != null) {
  30. if (se.getErrorCode().equals("QueueNotExist"))
  31. {
  32. System.out.println("Queue is not exist.Please create before use");
  33. } else if (se.getErrorCode().equals("TimeExpired"))
  34. {
  35. System.out.println("The request is time expired. Please check your local machine timeclock");
  36. }
  37. /*
  38. you can get more MNS service error code from following link:
  39. https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html
  40. */
  41. }
  42. } catch (Exception e)
  43. {
  44. System.out.println("Unknown exception happened!");
  45. e.printStackTrace();
  46. }
  47. client.close();
  48. }
  49. }

3. 注意事项

  • 工作流需要配置同一数据中心的消息队列,不支持配置跨地域的消息队列。

  • 在队列中消费消息后需要调用deleteMessage方法删除该消息,否则会导致应用端重复消费消息。