全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

Python 收发消息

更新时间:2017-08-28 16:05:19

本文主要描述如何在 Python 环境下使用 HTTP 协议收发 MQ 消息。


运行环境准备

用 HTTP 协议发送或者接收消息 ,请完成以下环境准备工作。

Windows

  1. 从 Python 官网(https://www.python.org/downloads/windows/) 下载并安装 Python 2.7。
  2. 打开 Windows 终端窗口(dos界面),输入 python 命令,检查 Python 是否安装成功, 如下图所示。

Linux/Unix

您可以在 Linux 终端输入 python 指令确认是否有预装。如果有执行信息,则说明本台机器已经预装。如果没有,请按以下步骤进行安装:

  1. 从 Python 官网下载 Python2.7 Linux 版本安装包:http://www.python.org/download/

  2. 在 Python2.7 安装包下载保存目录下,运行指令进行解压,例如:tar –xzf python-2.7.11.tgz

  3. 进入2.7.11目录,执行./configure 指令。此步骤主要是用于生成 makefile 文件。

  4. 执行 make 指令进行实际编译。

  5. 执行 make install 指令进行安装操作。

  6. 安装完毕后执行 python 命令检查安装是否成功,如下图:


运行示例代码

按照以下步骤运行示例代码。

Windows

  1. 将下文具体示例程序小节所提供的配置文件以及示例程序拷贝到本地,并保存为 .py 文件,如下图:

  2. 根据下文示例代码里的说明修改 user.properties 文件中的相关字段。

  3. 将终端窗口切换到保存 .py 的文件目录,执行保存的 python 文件,如下图:

Linux/Unix

步骤参见 Windows。执行结果参照下图:


具体示例程序

您可以参考以下示例程序测试消息收发功能。

1.配置文件

您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源

  1. [property]
  2. #您在控制台创建的Topic
  3. Topic=xxx
  4. #公测集群URL
  5. URL=http://publictest-rest.ons.aliyun.com
  6. #阿里云官网身份验证访问码
  7. Ak=xxx
  8. #阿里云身份验证密钥
  9. Sk=xxx
  10. #MQ控制台创建的Producer ID
  11. ProducerID=xxx
  12. #MQ控制台创建的Consumer ID
  13. ConsumerID=xxx

说明:URL中的 Key,Tag 以及 POST Content-Type 没有任何的限制,只要确保 Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。

2.发送消息示例程序

通过 HTTP 协议发送消息,请参考以下示例代码。

  1. #encoding:utf-8
  2. import ConfigParser
  3. import hashlib
  4. import httplib
  5. import time
  6. from urlparse import urlparse
  7. from Util import parseURL,calSignature
  8. """
  9. 消息发布者
  10. """
  11. class HttpProducer(object):
  12. def __init__(self):
  13. """签名值"""
  14. self.signature = "Signature"
  15. """ProducerID"""
  16. self.producerid = "ProducerID"
  17. """消息主题"""
  18. self.topic = "Topic"
  19. """访问码"""
  20. self.ak = "AccessKey"
  21. """配置文件解析器"""
  22. self.cf = ConfigParser.ConfigParser()
  23. """MD5对象"""
  24. self.md5 = hashlib.md5()
  25. """
  26. 发布消息主流程
  27. """
  28. def process(self):
  29. """读取配置文件"""
  30. self.cf.read("user.properties")
  31. """读取消息主题"""
  32. topic = self.cf.get("property","Topic")
  33. """存储消息URL路径"""
  34. url = self.cf.get("property","URL")
  35. """访问码"""
  36. ak = self.cf.get("property","Ak")
  37. """密钥"""
  38. sk = self.cf.get("property","Sk")
  39. """Producer ID"""
  40. pid = self.cf.get("property","ProducerID")
  41. """HTTP请求主体内容"""
  42. content = U"中文".encode('utf-8')
  43. """分隔符"""
  44. newline = "\n"
  45. """获取URL域名地址"""
  46. urlname = urlparse(url).hostname
  47. """根据HTPP主体内容计算MD5值"""
  48. self.md5.update(content)
  49. contentmd5 = self.md5.hexdigest()
  50. """建立HTTP连接对象"""
  51. conn = httplib.HTTPConnection(parseURL(urlname))
  52. try:
  53. for index in range(0,100):
  54. """时间戳"""
  55. date = repr(int(time.time() * 1000))[0:13]
  56. """构造签名字符串"""
  57. signString = str(topic + newline + pid + newline + contentmd5 + newline + date)
  58. """计算签名"""
  59. sign = calSignature(signString,sk)
  60. """内容类型"""
  61. contentFlag ="Content-type"
  62. """HTTP请求头部对象"""
  63. headers = {
  64. self.signature : sign,
  65. self.ak : ak,
  66. self.producerid : pid,
  67. contentFlag : "text/html;charset=UTF-8"
  68. }
  69. """开始发送HTTP请求消息"""
  70. conn.request(method="POST",url=url + "/message/?topic="+topic+"&time="+date+"&tag=http&key=http",
  71. body=content,
  72. headers=headers)
  73. """获取HTTP应答消息"""
  74. response = conn.getresponse()
  75. """读取HTTP应答内容"""
  76. msg = response.read()
  77. print "response:"+msg
  78. except Exception,e:
  79. print e
  80. finally:
  81. conn.close()
  82. """流程入口"""
  83. if __name__ == '__main__':
  84. """创建消息发布者"""
  85. producer = HttpProducer()
  86. """开启消息发布者"""
  87. producer.process()

3. 接收消息示例程序

通过 HTTP 协议接收消息,请参考以下示例代码。

  1. #encoding:utf-8
  2. import ConfigParser
  3. import httplib
  4. import json
  5. import time
  6. from urlparse import urlparse
  7. from Util import parseURL,calSignature
  8. """
  9. 消息订阅者
  10. """
  11. class HttpConsumer(object):
  12. def __init__(self):
  13. """签名字段"""
  14. self.signature = "Signature"
  15. """Consumer ID"""
  16. self.consumerid = "ConsumerID"
  17. """消费主题"""
  18. self.topic = "Topic"
  19. """访问码"""
  20. self.ak = "AccessKey"
  21. """配置文件解析器"""
  22. self.cf = ConfigParser.ConfigParser()
  23. """
  24. 订阅消息流程
  25. """
  26. def process(self):
  27. """开始读取配置文件"""
  28. self.cf.read("user.properties")
  29. """读取主题"""
  30. topic = self.cf.get("property", "Topic")
  31. """存储消息的URL路径"""
  32. url = self.cf.get("property", "URL")
  33. """访问码"""
  34. ak = self.cf.get("property", "Ak")
  35. """密钥"""
  36. sk = self.cf.get("property", "Sk")
  37. """Consumer ID"""
  38. cid = self.cf.get("property", "ConsumerID")
  39. newline = "\n"
  40. """获取URL主机域名地址"""
  41. urlname = urlparse(url).hostname
  42. """连接存储消息的服务器"""
  43. conn = httplib.HTTPConnection(parseURL(urlname))
  44. while True:
  45. try:
  46. """时间戳"""
  47. date = repr(int(time.time() * 1000))[0:13]
  48. """构造签名字符串"""
  49. signString = topic + newline + cid + newline + date
  50. """计算签名值"""
  51. sign = calSignature(signString,sk)
  52. """请求消息HTTP头部"""
  53. headers = {
  54. self.signature : sign,
  55. self.ak : ak,
  56. self.consumerid : cid
  57. }
  58. """开始发送获取消息的HTTP请求"""
  59. conn.request(method="GET",url=url+"/message/?topic="+topic+"&time="+date+"&num=32",headers=headers)
  60. """获取HTTP应答消息"""
  61. response = conn.getresponse()
  62. """验证应答消息状态值"""
  63. if response.status != 200:
  64. continue
  65. """从应答消息中读取实际的消息内容"""
  66. msg = response.read()
  67. """将实际的消费消息进行解码"""
  68. messages = json.loads(msg)
  69. if len(messages) == 0:
  70. time.sleep(2)
  71. continue
  72. """依次获取每条消费消息"""
  73. for message in messages:
  74. """计算时间戳"""
  75. date = repr(int(time.time() * 1000))[0:13]
  76. """构建删除消费消息URL路径"""
  77. delUrl = url + "/message/?msgHandle="+message['msgHandle'] + "&topic="+topic+"&time="+date
  78. """构造签名字符串"""
  79. signString = topic + newline + cid + newline + message['msgHandle'] + newline + date
  80. """进行签名"""
  81. sign = calSignature(signString,sk)
  82. """构造删除消费消息HTTP头部"""
  83. delheaders = {
  84. self.signature : sign,
  85. self.ak : ak,
  86. self.consumerid : cid,
  87. }
  88. """发送删除消息请求"""
  89. conn.request(method="DELETE", url=delUrl, headers=delheaders)
  90. """获取请求应答"""
  91. response = conn.getresponse()
  92. """读取应答内容"""
  93. msg = response.read()
  94. print "delete msg:"+msg
  95. except Exception,e:
  96. print e
  97. conn.close()
  98. """启动入口"""
  99. if __name__ == '__main__':
  100. """构造消息订阅者"""
  101. consumer = HttpConsumer()
  102. """开始启动消息订阅者"""
  103. consumer.process()

4. 工具方法示例程序

以下为示例中使用的工具方法。

  1. #encoding:utf-8
  2. import socket
  3. import hmac
  4. from hashlib import sha1
  5. """
  6. 解析URL
  7. """
  8. def parseURL(url):
  9. iplist = socket.gethostbyname_ex(url)
  10. if len(iplist) == 0:
  11. return None
  12. ips = iplist[2]
  13. if len(ips) == 0:
  14. return None
  15. return ips[0]
  16. """
  17. 认证签名
  18. """
  19. def calSignature(signString, sk):
  20. mac = hmac.new(sk, signString, sha1)
  21. return mac.digest().encode('base64').rstrip()
本文导读目录