全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息服务

HttpEndpoint 示例代码

更新时间:2017-06-07 13:26:11

这里仅展示HttpEndpoint 部分核心代码,完整的代码请参考 python sdk中simple_http_notify_endpoint.py。

  1. class SimpleHttpNotifyEndpoint(BaseHTTPServer.BaseHTTPRequestHandler):
  2. server_version = "SimpleHttpNotifyEndpoint/" + __version__
  3. access_log_file = "access_log"
  4. msg_type = "XML"
  5. def do_POST(self):
  6. content_length = int(self.headers.getheader('content-length', 0))
  7. self.req_body = self.rfile.read(content_length)
  8. self.msg = NotifyMessage()
  9. logger.info("Headers:%s\nBody:%s" % (self.headers, self.req_body))
  10. if not self.authenticate():
  11. res_code = 403
  12. res_content = "Access Forbidden"
  13. logger.warning("Access Forbidden!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
  14. elif not self.validateBody(self.req_body, self.msg, self.msg_type):
  15. res_code = 400
  16. res_content = "Invalid Notify Message"
  17. logger.warning("Invalid Notify Message!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
  18. else:
  19. res_code = 201
  20. res_content = ""
  21. logger.info("Notify Message Succeed!\n%s" % self.msg)
  22. self.access_log(res_code)
  23. self.response(res_code, res_content)
  24. def authenticate(self):
  25. #get string to signature
  26. service_str = "\n".join(sorted(["%s:%s" % (k,v) for k,v in self.headers.items() if k.startswith("x-mns-")]))
  27. sign_header_list = []
  28. for key in ["content-md5", "content-type", "date"]:
  29. if key in self.headers.keys():
  30. sign_header_list.append(self.headers.getheader(key))
  31. else:
  32. sign_header_list.append("")
  33. str2sign = "%s\n%s\n%s\n%s" % (self.command, "\n".join(sign_header_list), service_str, self.path)
  34. #verify
  35. authorization = self.headers.getheader('Authorization')
  36. signature = base64.b64decode(authorization)
  37. cert_str = urllib2.urlopen(base64.b64decode(self.headers.getheader('x-mns-signing-cert-url'))).read()
  38. pubkey = M2Crypto.X509.load_cert_string(cert_str).get_pubkey()
  39. pubkey.reset_context(md='sha1')
  40. pubkey.verify_init()
  41. pubkey.verify_update(str2sign)
  42. return pubkey.verify_final(signature)
  43. def validateBody(self, data, msg, type):
  44. if type == "XML":
  45. return self.xml_decode(data, msg)
  46. else:
  47. msg.message = data
  48. return True
  49. def xml_decode(self, data, msg):
  50. if data == "":
  51. logger.error("Data is \"\".")
  52. return False
  53. try:
  54. dom = xml.dom.minidom.parseString(data)
  55. except Exception, e:
  56. logger.error("Parse string fail, exception:%s" % e)
  57. return False
  58. node_list = dom.getElementsByTagName("Notification")
  59. if not node_list:
  60. logger.error("Get node of \"Notification\" fail:%s" % e)
  61. return False
  62. data_dic = {}
  63. for node in node_list[0].childNodes:
  64. if node.nodeName != "#text" and node.childNodes != []:
  65. data_dic[node.nodeName] = str(node.childNodes[0].nodeValue.strip())
  66. key_list = ["TopicOwner", "TopicName", "Subscriber", "SubscriptionName", "MessageId", "MessageMD5", "Message", "PublishTime"]
  67. for key in key_list:
  68. if key not in data_dic.keys():
  69. logger.error("Check item fail. Need \"%s\"." % key)
  70. return False
  71. msg.topic_owner = data_dic["TopicOwner"]
  72. msg.topic_name = data_dic["TopicName"]
  73. msg.subscriber = data_dic["Subscriber"]
  74. msg.subscription_name = data_dic["SubscriptionName"]
  75. msg.message_id = data_dic["MessageId"]
  76. msg.message_md5 = data_dic["MessageMD5"]
  77. msg.message_tag = data_dic["MessageTag"] if data_dic.has_key("MessageTag") else ""
  78. msg.message = data_dic["Message"]
  79. msg.publish_time = data_dic["PublishTime"]
  80. return True
本文导读目录