全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息服务

主题+QueueEndpoint使用手册

更新时间:2017-06-07 13:26:11

本文档介绍如何使用python sdk中的sample代码,完成创建主题、创建QueueEndpoint订阅、创建队列、发布消息、从队列接收删除消息和删除主题操作。

1. 准备

  • 下载最新版python sdk,解压后进入mns_python_sdk子目录;
  • 打开sample.cfg文件,配置AccessKeyID、AccessKeySecret和Endpoint;
    • AccessKeyId、AccessKeySecret
    • Endpoint
      • 访问MNS的接入地址,登陆MNS控制台 单击右上角 获取Endpoint 查看;
      • 不同地域的接入地址不同;
    • SecurityToken
      • 阿里云访问控制服务提供的短期访问权限凭证,直接使用阿里云账号或者子账号访问不需要配置该项,了解详情;
  • 进入sample目录,后续使用的脚本都在这里;

2. 创建主题

运行 createtopic.py 创建主题;
默认创建的主题名称是 MySampleTopic,也可以通过参数指定主题名称;
主题详细信息请参考详情;

  • 运行
  1. $python createtopic.py MyTopic1
  2. Create Topic Succeed! TopicName:MyTopic1
  • 核心代码

endpoint,accid,acckey和token从第1步的配置文件中读取;

  1. #init my_account, my_topic
  2. my_account = Account(endpoint, accid, acckey, token)
  3. topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
  4. my_topic = my_account.get_topic(topic_name)
  5. #you can get more information of TopicMeta from mns/topic.py
  6. topic_meta = TopicMeta()
  7. try:
  8. topic_url = my_topic.create(topic_meta)
  9. print "Create Topic Succeed! TopicName:%s\n" % topic_name
  10. except MNSExceptionBase, e:
  11. if e.type == "TopicAlreadyExist":
  12. print "Topic already exist, please delete it before creating or use it directly."
  13. sys.exit(0)
  14. print "Create Topic Fail! Exception:%s\n" % e

3. 创建队列

运行 createqueue.py 创建队列;
默认创建的队列名称是 MySampleQueue,也可以通过参数指定队列名称;
队列详细信息请参考详情;

  • 运行
  1. $python createqueue.py MyQueue1
  2. Create Queue Succeed! QueueName:MyQueue1
  • 核心代码

endpoint,accid,acckey和token从第1步的配置文件中读取;

  1. #init my_account, my_queue
  2. my_account = Account(endpoint, accid, acckey, token)
  3. queue_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleQueue"
  4. my_queue = my_account.get_queue(queue_name)
  5. #you can get more information of QueueMeta from mns/queue.py
  6. queue_meta = QueueMeta()
  7. try:
  8. queue_url = my_queue.create(queue_meta)
  9. print "Create Queue Succeed! QueueName:%s\n" % queue_name
  10. except MNSExceptionBase, e:
  11. if e.type == "QueueAlreadyExist":
  12. print "Queue already exist, please delete it before creating or use it directly."
  13. sys.exit(0)
  14. print "Create Queue Fail! Exception:%s\n" % e

4. 创建订阅

运行 subscribe.py 创建订阅;
第一个参数指定队列的地域,必须与主题在同一个地域,此处以杭州为例;
第二个参数指定队列的名称,使用第3步创建的队列名称;
第三个参数指定订阅的主题名称,如果第2步中指定了主题名称,这里同样指定主题名称;
第四个参数指定订阅的名称,默认是 MySampleTopic-Sub;
订阅详细信息请参考详情;

  • 运行
  1. $python subscribe_queueendpoint.py cn-hangzhou MyQueue1 MyTopic1 MyTopic1-Sub1
  2. Create Subscription Succeed! TopicName:MyTopic1 SubName:MyTopic1-Sub1 Endpoint:acs:mns:cn-hangzhou:127797386164059:queues/MyQueue1
  • 核心代码

endpoint,accid,acckey和token从第1步的配置文件中读取;

  1. region = sys.argv[1]
  2. queue_name = sys.argv[2]
  3. queue_endpoint = TopicHelper.generate_queue_endpoint(region, account_id, queue_name)
  4. #init my_account, my_topic, my_sub
  5. my_account = Account(endpoint, accid, acckey, token)
  6. topic_name = sys.argv[3] if len(sys.argv) > 3 else "MySampleTopic"
  7. my_topic = my_account.get_topic(topic_name)
  8. sub_name = sys.argv[4] if len(sys.argv) > 4 else "MySampleTopic-Sub"
  9. my_sub = my_topic.get_subscription(sub_name)
  10. #you can get more information of SubscriptionMeta from mns/subscription.py
  11. sub_meta = SubscriptionMeta(queue_endpoint, notify_content_format = SubscriptionNotifyContentFormat.SIMPLIFIED)
  12. try:
  13. topic_url = my_sub.subscribe(sub_meta)
  14. print "Create Subscription Succeed! TopicName:%s SubName:%s Endpoint:%s\n" % (topic_name, sub_name, queue_endpoint)
  15. except MNSExceptionBase, e:
  16. if e.type == "TopicNotExist":
  17. print "Topic not exist, please create topic."
  18. sys.exit(0)
  19. elif e.type == "SubscriptionAlreadyExist":
  20. print "Subscription already exist, please unsubscribe or use it directly."
  21. sys.exit(0)
  22. print "Create Subscription Fail! Exception:%s\n" % e

5. 发布消息

运行publishmessage.py 发布多条消息到主题中;
如果第2步中指定了主题名称,这里同样通过第一个参数指定主题名称;
消息详细信息请参考详情;

  • 运行
  1. $python publishmessage.py MyTopic1
  2. ==========Publish Message To Topic==========
  3. TopicName:MyTopic1
  4. MessageCount:3
  5. Publish Message Succeed. MessageBody:I am test message 0. MessageID:F6EA56633844DBFC-1-154BDFB8059-200000004
  6. Publish Message Succeed. MessageBody:I am test message 1. MessageID:F6EA56633844DBFC-1-154BDFB805F-200000005
  7. Publish Message Succeed. MessageBody:I am test message 2. MessageID:F6EA56633844DBFC-1-154BDFB8062-200000006
  • 核心代码

endpoint,accid,acckey和token从第1步的配置文件中读取;

  1. #init my_account, my_topic
  2. my_account = Account(endpoint, accid, acckey, token)
  3. topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
  4. my_topic = my_account.get_topic(topic_name)
  5. #publish some messages
  6. msg_count = 3
  7. print "%sPublish Message To Topic%s\nTopicName:%s\nMessageCount:%s\n" % (10*"=", 10*"=", topic_name, msg_count)
  8. for i in range(msg_count):
  9. try:
  10. msg_body = "I am test message %s." % i
  11. msg = TopicMessage(msg_body)
  12. re_msg = my_topic.publish_message(msg)
  13. print "Publish Message Succeed. MessageBody:%s MessageID:%s" % (msg_body, re_msg.message_id)
  14. except MNSExceptionBase,e:
  15. if e.type == "TopicNotExist":
  16. print "Topic not exist, please create it."
  17. sys.exit(1)
  18. print "Publish Message Fail. Exception:%s" % e

6. 从队列获取和删除消息

第5步发布了多条消息到主题中,MNS 会将发布的消息推送给第4步指定的队列中;
运行recvdelmessage.py,接收并删除队列中的消息,直到队列为空;
第一个参数指定队列的名称,使用第4步指定的队列名;
第二个参数指定消息体不做base64解码,因为publish message时未编码;
程序中receive message使用long polling方式,指定 wait seconds为3秒,因此当队列为空时,程序会等待3秒;
消息详细信息请参考详情;

  1. $python recvdelmessage.py MyQueue1 false
  2. ==========Receive And Delete Message From Queue==========
  3. QueueName:MyQueue1
  4. WaitSeconds:3
  5. Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTEtOA== MessageBody:I am test message 0. MessageID:E56AE055BAA638AC-1-1570CE43AD3-200000001
  6. Delete Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTEtOA==
  7. Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5NC0xNDczMzkwMjkyLTEtOA== MessageBody:I am test message 1. MessageID:E56AE055BAA638AC-1-1570CE43AE0-200000002
  8. Delete Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5NC0xNDczMzkwMjkyLTEtOA==
  9. Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTItOA== MessageBody:I am test message 2. MessageID:CDAC88D223C0F9E3-2-1570CE43B2E-200000001
  10. Delete Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTItOA==
  11. Queue is empty!

7. 删除主题

运行deletetopic.py 删除主题;
如果第2步中指定了主题名称,这里同样通过第一个参数指定主题名称;

  • 运行
  1. $python deletetopic.py MyTopic1
  2. Delete Topic Succeed! TopicName:MyTopic1
  • 核心代码
  1. #init my_account, my_topic
  2. my_account = Account(endpoint, accid, acckey, token)
  3. topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
  4. my_topic = my_account.get_topic(topic_name)
  5. try:
  6. my_topic.delete()
  7. print "Delete Topic Succeed! TopicName:%s\n" % topic_name
  8. except MNSExceptionBase, e:
  9. print "Delete Topic Fail! Exception:%s\n" % e

8. 删除队列

运行deletequeue.py 删除队列

  • 运行
  1. $python deletequeue.py MyQueue1
  2. Delete Queue Succeed! QueueName:MyQueue1
  • 核心代码

endpoint,accid,acckey和token从第1步的配置文件中读取;

  1. #init my_account, my_queue
  2. my_account = Account(endpoint, accid, acckey, token)
  3. queue_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleQueue"
  4. my_queue = my_account.get_queue(queue_name)
  5. #delete queue
  6. try:
  7. my_queue.delete()
  8. print "Delete Queue Succeed! QueueName:%s\n" % queue_name
  9. except MNSExceptionBase, e:
  10. print "Delete Queue Fail! Exception:%s\n" % e
本文导读目录