# -- coding: utf-8 --
import time
import random
import hmac
import base64
import copy
import urllib.request, urllib.parse, urllib.error
import http.client
import collections
import sys
import hashlib
class V3Api:
#定义变量
URI_PREFIX = '/v3/openapi/apps/'
OS_PREFIX = 'OPENSEARCH'
VERB = 'POST'
#定义需推送到的应用表名,下面替换为数据需推送到的应用表名
TABLE_NAME = 'tab'
#定义上传数据,将下面待上传数据替换为自己的数据
body_json = u'[{\"cmd\":\"ADD\",\"fields\":{\"id\":1,\"name\":\"测试推送数据\"}}]'
def __init__(self, address = '', port = ''):
self.address = address
self.port = port
def runQuery(self,
app_name = None,
access_key = None,
secret = None,
http_header = {},
http_params = {}):
query, header = self.buildQuery(app_name = app_name,
access_key = access_key,
secret = secret,
http_header = http_header,
http_params = http_params)
conn = http.client.HTTPConnection(self.address, self.port)
conn.request(self.VERB, url = query,body = self.body_json.encode('utf-8'), headers = header)
response = conn.getresponse()
return response.status, response.getheaders(), response.read()
def buildQuery(self,
app_name = None,
access_key = None,
secret = None,
http_header = {},
http_params = {}):
uri = self.URI_PREFIX
if app_name is not None:
uri += app_name
uri += '/{TABLE_NAME}/actions/bulk'.format(TABLE_NAME=self.TABLE_NAME)
param = []
for key, value in http_params.items():
param.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value))
query = ('&'.join(param))
request_header = self.buildRequestHeader(uri = uri,
access_key = access_key,
secret = secret,
http_params = http_params,
http_header = http_header)
return uri + query, request_header
# 此处为签名代码实现
def buildAuthorization(self, uri, access_key, secret, http_params, request_header):
canonicalized = self.VERB + '\n'\
+ self.__getHeader(request_header, 'Content-MD5', '') + '\n' \
+ self.__getHeader(request_header, 'Content-Type', '') + '\n' \
+ self.__getHeader(request_header, 'Date', '') + '\n' \
+ self.__canonicalizedHeaders(request_header) \
+ self.__canonicalizedResource(uri, http_params)
signature_hmac = hmac.new(secret.encode('utf-8'), canonicalized.encode('utf-8'), 'sha1')
signature = base64.b64encode(signature_hmac.digest())
return '%s %s%s%s' %(self.OS_PREFIX, access_key, ':', signature.decode('utf-8'))
# 添加不为空的header
def __getHeader(self, header, key, default_value = None):
if key in header and header[key] is not None:
return header[key]
return default_value
def __canonicalizedResource(self, uri, http_params):
canonicalized = urllib.parse.quote(uri).replace('%2F', '/')
sorted_params = sorted(list(http_params.items()), key = lambda http_params : http_params[0])
params = []
for (key, value) in sorted_params:
if value is None or len(value) == 0:
continue
params.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value))
return canonicalized + '&'.join(params)
def generateDate(self, format = "%Y-%m-%dT%H:%M:%SZ", timestamp = None):
if timestamp is None:
return time.strftime(format, time.gmtime())
else:
return time.strftime(format, timestamp)
def generateNonce(self):
return str(int(time.time()*100)) + str(random.randint(1000, 9999))
def __canonicalizedHeaders(self, request_header):
header = {}
for key, value in request_header.items():
if key is None or value is None:
continue
k = key.strip(' \t')
v = value.strip(' \t')
if k.startswith('X-Opensearch-') and len(v) > 0:
header[k] = v
if len(header) == 0:
return ''
sorted_header = sorted(list(header.items()), key=lambda header: header[0])
canonicalized = ''
for (key, value) in sorted_header:
canonicalized += (key.lower() + ':' + value + '\n')
return canonicalized
# 构建Request Header
def buildRequestHeader(self, uri, access_key, secret, http_params, http_header):
request_header = copy.deepcopy(http_header)
if not self.body_json.strip():
request_header['Content-MD5'] = ''
else:
if 'Content-MD5' not in request_header:
request_header['Content-MD5'] = hashlib.md5(self.body_json.encode('utf-8')).hexdigest()
if 'Content-Type' not in request_header:
request_header['Content-Type'] = 'application/json'
if 'Date' not in request_header:
request_header['Date'] = self.generateDate()
if 'X-Opensearch-Nonce' not in request_header:
request_header['X-Opensearch-Nonce'] = self.generateNonce()
if 'Authorization' not in request_header:
request_header['Authorization'] = self.buildAuthorization(uri,
access_key,
secret,
http_params,
request_header)
key_del = []
for key, value in request_header.items():
if value is None:
key_del.append(key)
for key in key_del:
del request_header[key]
return request_header
if __name__ == '__main__':
accesskey_id = '替换为AccessKeyId'
accesskey_secret = '替换为secret'
# 下面host地址,替换为访问对应应用api地址,例如华东1区
internet_host = 'opensearch-cn-hangzhou.aliyuncs.com'
appname = '替换为应用名'
api = V3Api(address = internet_host, port = '80')
print(api.runQuery(app_name = appname, access_key=accesskey_id, secret=accesskey_secret, http_params={}, http_header={}))