主题+QueueEndpoint使用手册

本文介绍如何使用Python SDK中的sample代码,完成创建主题、创建队列、创建订阅、发布消息和删除主题等操作。

步骤一:准备工作

  1. 下载最新版Python SDK,解压后进入mns_python_sdk子目录。本文示例代码以Python 2.x为例。

  2. 打开sample.cfg文件,配置AccessKeyId、AccessKeySecret、Endpoint和SecurityToken。

    • AccessKeyId、AccessKeySecret

    • Endpoint

      • 访问消息服务MNS的接入地址,登录MNS控制台查看。具体操作,请参见获取接入点

      • 不同地域的接入地址不同。

    • SecurityToken

      • 阿里云访问控制服务提供的短期访问权限凭证,使用阿里云账号或者RAM用户访问不需要配置该项。更多信息,请参见什么是STS

  3. 进入sample目录,后续使用的脚本都在该目录。

步骤二:创建主题

运行createtopic.py创建主题。默认创建的主题名称是MySampleTopic,也可以通过参数指定主题名称。更多信息,请参见Topic

  • 运行以下命令:

    python createtopic.py MyTopic1         

    返回结果如下:

    Create Topic Succeed! TopicName:MyTopic1 
  • 核心代码:

    endpointaccidacckeytoken从步骤一配置的文件sample.cfg中读取。

    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    
    topic_meta = TopicMeta()
    try:
        topic_url = my_topic.create(topic_meta)
        print "Create Topic Succeed! TopicName:%s\n" % topic_name
    except MNSExceptionBase, e:
        if e.type == "TopicAlreadyExist":
            print "Topic already exist, please delete it before creating or use it directly."
            sys.exit(0)
        print "Create Topic Fail! Exception:%s\n" % e         

步骤三:创建队列

运行createqueue.py创建队列。默认创建的队列名称是MySampleQueue,也可以通过参数指定队列名称。更多信息,请参见Queue

  • 运行以下命令:

    python createqueue.py MyQueue1      

    返回结果如下:

    Create Queue Succeed! QueueName:MyQueue1 
  • 核心代码:

    endpointaccidacckeytoken从步骤一配置的文件sample.cfg中读取。

    my_account = Account(endpoint, accid, acckey, token)
    queue_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleQueue"
    my_queue = my_account.get_queue(queue_name)
    
    queue_meta = QueueMeta()
    try:
        queue_url = my_queue.create(queue_meta)
        print "Create Queue Succeed! QueueName:%s\n" % queue_name
    except MNSExceptionBase, e:
        if e.type == "QueueAlreadyExist":
            print "Queue already exist, please delete it before creating or use it directly."
            sys.exit(0)
        print "Create Queue Fail! Exception:%s\n" % e     

步骤四:创建订阅

运行subscribe.py创建订阅。文件中四个参数的含义如下所示。更多信息,请参见Subscription

  • 第一个参数指定队列的地域,必须与主题在同一个地域(此处以杭州为例);

  • 第二个参数指定队列的名称,使用步骤三创建的队列名称;

  • 第三个参数指定订阅的主题名称,如果步骤二中指定了主题名称,这里同样指定主题名称;

  • 第四个参数指定订阅的名称,默认是MySampleTopic-Sub。

  • 运行以下命令:

    python subscribe_queueendpoint.py cn-hangzhou MyQueue1 MyTopic1 MyTopic1-Sub1

    返回结果如下:

    Create Subscription Succeed! TopicName:MyTopic1 SubName:MyTopic1-Sub1 Endpoint:acs:mns:cn-hangzhou:123456789098****:queues/MyQueue1
  • 核心代码:

    endpointaccidacckeytoken从步骤一配置的文件sample.cfg中读取。

    region = sys.argv[1]
    queue_name = sys.argv[2]
    queue_endpoint = TopicHelper.generate_queue_endpoint(region, account_id, queue_name)
    
    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[3] if len(sys.argv) > 3 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    sub_name = sys.argv[4] if len(sys.argv) > 4 else "MySampleTopic-Sub"
    my_sub = my_topic.get_subscription(sub_name)
    
    sub_meta = SubscriptionMeta(queue_endpoint, notify_content_format = SubscriptionNotifyContentFormat.SIMPLIFIED)
    try:
        topic_url = my_sub.subscribe(sub_meta)
        print "Create Subscription Succeed! TopicName:%s SubName:%s Endpoint:%s\n" % (topic_name, sub_name, queue_endpoint)
    except MNSExceptionBase, e:
        if e.type == "TopicNotExist":
            print "Topic not exist, please create topic."
            sys.exit(0)
        elif e.type == "SubscriptionAlreadyExist":
            print "Subscription already exist, please unsubscribe or use it directly."
            sys.exit(0)
        print "Create Subscription Fail! Exception:%s\n" % e

步骤五:发布消息

运行publishmessage.py发布多条消息到主题中。如果步骤二中指定了主题名称,这里同样通过第一个参数指定主题名称。更多信息,请参见TopicMessage

  • 运行以下命令:

    python publishmessage.py MyTopic1        

    返回结果如下:

    ==========Publish Message To Topic==========
    TopicName:MyTopic1
    MessageCount:3
    
    Publish Message Succeed. MessageBody:I am test message 0. MessageID:F6EA56633844DBFC-1-154BDFB****-200000004
    Publish Message Succeed. MessageBody:I am test message 1. MessageID:F6EA56633844DBFC-1-154BDFB****-200000005
    Publish Message Succeed. MessageBody:I am test message 2. MessageID:F6EA56633844DBFC-1-154BDFB****-200000006 
  • 核心代码:

    endpointaccidacckeytoken从步骤一配置的文件sample.cfg中读取。

    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    
    msg_count = 3
    print "%sPublish Message To Topic%s\nTopicName:%s\nMessageCount:%s\n" % (10*"=", 10*"=", topic_name, msg_count)
    
    for i in range(msg_count):
        try:
            msg_body = "I am test message %s." % i
            msg = TopicMessage(msg_body)
            re_msg = my_topic.publish_message(msg)
            print "Publish Message Succeed. MessageBody:%s MessageID:%s" % (msg_body, re_msg.message_id)
        except MNSExceptionBase,e:
            if e.type == "TopicNotExist":
                print "Topic not exist, please create it."
                sys.exit(1)
            print "Publish Message Fail. Exception:%s" % e          

步骤六:从队列获取和删除消息

步骤五中,发布了多条消息到主题,消息服务MNS会将发布的消息推送给步骤四指定的队列中。

运行recvdelmessage.py,接收并删除队列中的消息,直到队列为空。

第一个参数指定队列的名称,使用步骤四指定的队列名;第二个参数指定消息体不做Base64解码,因为publish message时未编码。程序中receive message使用long polling方式,指定wait seconds为3秒,因此当队列为空时,程序会等待3秒。更多信息,请参见QueueMessage

python recvdelmessage.py MyQueue1 false

返回结果如下:

==========Receive And Delete Message From Queue==========
QueueName:MyQueue1
WaitSeconds:3

Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTEtOA== MessageBody:I am test message 0. MessageID:E56AE055BAA638AC-1-1570CE4****-200000001
Delete Message Succeed!  ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTEtOA==
Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5NC0xNDczMzkwMjkyLTEtOA== MessageBody:I am test message 1. MessageID:E56AE055BAA638AC-1-1570CE4****-200000002
Delete Message Succeed!  ReceiptHandle:1-ODU4OTkzNDU5NC0xNDczMzkwMjkyLTEtOA==
Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTItOA== MessageBody:I am test message 2. MessageID:CDAC88D223C0F9E3-2-1570CE4****-200000001
Delete Message Succeed!  ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTItOA==
Queue is empty!

步骤七:删除主题

运行deletetopic.py删除主题。如果步骤二中指定了主题名称,这里同样通过第一个参数指定主题名称。

  • 运行以下命令:

    python deletetopic.py MyTopic1

    返回结果如下:

    Delete Topic Succeed! TopicName:MyTopic1
  • 核心代码:

    endpointaccidacckeytoken从步骤一配置的文件sample.cfg中读取。

    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    
    try:
        my_topic.delete()
        print "Delete Topic Succeed! TopicName:%s\n" % topic_name
    except MNSExceptionBase, e:
        print "Delete Topic Fail! Exception:%s\n" % e         

步骤八:删除队列

运行deletequeue.py删除队列。

  • 运行以下命令:

    python deletequeue.py MyQueue1

    返回结果如下:

    Delete Queue Succeed! QueueName:MyQueue1
  • 核心代码:

    endpointaccidacckeytoken从步骤一配置的文件sample.cfg中读取。

    my_account = Account(endpoint, accid, acckey, token)
    queue_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleQueue"
    my_queue = my_account.get_queue(queue_name)
    
    #delete queue
    try:
        my_queue.delete()
        print "Delete Queue Succeed! QueueName:%s\n" % queue_name
    except MNSExceptionBase, e:
        print "Delete Queue Fail! Exception:%s\n" % e