本文仅展示HTTP Endpoint部分核心代码,完整的代码请参见Python SDK中simple_http_notify_endpoint.py。
说明 示例中HTTP Endpoint的针对Authorization的校验不能完全规避掉恶意伪造的MNS请求,需要从业务层面上设计从发送端到HTTP Endpoint的安全校验机制。
class SimpleHttpNotifyEndpoint(BaseHTTPServer.BaseHTTPRequestHandler):
server_version = "SimpleHttpNotifyEndpoint/" + __version__
access_log_file = "access_log"
msg_type = "XML"
def do_POST(self):
content_length = int(self.headers.getheader('content-length', 0))
self.req_body = self.rfile.read(content_length)
self.msg = NotifyMessage()
logger.info("Headers:%s\nBody:%s" % (self.headers, self.req_body))
if not self.authenticate():
res_code = 403
res_content = "Access Forbidden"
logger.warning("Access Forbidden!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
elif not self.validateBody(self.req_body, self.msg, self.msg_type):
res_code = 400
res_content = "Invalid Notify Message"
logger.warning("Invalid Notify Message!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
else:
res_code = 201
res_content = ""
logger.info("Notify Message Succeed!\n%s" % self.msg)
self.access_log(res_code)
self.response(res_code, res_content)
def authenticate(self):
#get string to signature
service_str = "\n".join(sorted(["%s:%s" % (k,v) for k,v in self.headers.items() if k.startswith("x-mns-")]))
sign_header_list = []
for key in ["content-md5", "content-type", "date"]:
if key in self.headers.keys():
sign_header_list.append(self.headers.getheader(key))
else:
sign_header_list.append("")
str2sign = "%s\n%s\n%s\n%s" % (self.command, "\n".join(sign_header_list), service_str, self.path)
#verify
authorization = self.headers.getheader('Authorization')
signature = base64.b64decode(authorization)
cert_str = urllib2.urlopen(base64.b64decode(self.headers.getheader('x-mns-signing-cert-url'))).read()
pubkey = M2Crypto.X509.load_cert_string(cert_str).get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
pubkey.verify_update(str2sign)
return pubkey.verify_final(signature)
def validateBody(self, data, msg, type):
if type == "XML":
return self.xml_decode(data, msg)
else:
msg.message = data
return True
def xml_decode(self, data, msg):
if data == "":
logger.error("Data is \"\".")
return False
try:
dom = xml.dom.minidom.parseString(data)
except Exception, e:
logger.error("Parse string fail, exception:%s" % e)
return False
node_list = dom.getElementsByTagName("Notification")
if not node_list:
logger.error("Get node of \"Notification\" fail:%s" % e)
return False
data_dic = {}
for node in node_list[0].childNodes:
if node.nodeName != "#text" and node.childNodes != []:
data_dic[node.nodeName] = str(node.childNodes[0].nodeValue.strip())
key_list = ["TopicOwner", "TopicName", "Subscriber", "SubscriptionName", "MessageId", "MessageMD5", "Message", "PublishTime"]
for key in key_list:
if key not in data_dic.keys():
logger.error("Check item fail. Need \"%s\"." % key)
return False
msg.topic_owner = data_dic["TopicOwner"]
msg.topic_name = data_dic["TopicName"]
msg.subscriber = data_dic["Subscriber"]
msg.subscription_name = data_dic["SubscriptionName"]
msg.message_id = data_dic["MessageId"]
msg.message_md5 = data_dic["MessageMD5"]
msg.message_tag = data_dic["MessageTag"] if data_dic.has_key("MessageTag") else ""
msg.message = data_dic["Message"]
msg.publish_time = data_dic["PublishTime"]
return True