# -- coding: utf-8 --
import time
import random
import hmac
import base64
import copy
import urllib.request, urllib.parse, urllib.error
import http.client
import collections
import sys
from hashlib import sha1
class V3Api:
#定义变量
URI_PREFIX = '/v3/openapi/apps/'
OS_PREFIX = 'OPENSEARCH'
VERB = 'GET'
def __init__(self, address = '', port = ''):
self.address = address
self.port = port
def runQuery(self,
app_name = None,
access_key = None,
secret = None,
http_header = {},
http_params = {}):
query, header = self.buildQuery(app_name = app_name,
access_key = access_key,
secret = secret,
http_header = http_header,
http_params = http_params)
conn = http.client.HTTPConnection(self.address, self.port)
conn.request(self.VERB, url = query, headers = header)
response = conn.getresponse()
return response.status, response.getheaders(), response.read()
def buildQuery(self,
app_name = None,
access_key = None,
secret = None,
http_header = {},
http_params = {}):
uri = self.URI_PREFIX
if app_name is not None:
uri += app_name
uri += '/search'
param = []
for key, value in http_params.items():
param.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value))
query = ('&'.join(param))
request_header = self.buildRequestHeader(uri = uri,
access_key = access_key,
secret = secret,
http_params = http_params,
http_header = http_header)
return uri + '?' + query, request_header
# 此处为签名代码实现
def buildAuthorization(self, uri, access_key, secret, http_params, request_header):
canonicalized = self.VERB + '\n'\
+ self.__getHeader(request_header, 'Content-MD5', '') + '\n' \
+ self.__getHeader(request_header, 'Content-Type', '') + '\n' \
+ self.__getHeader(request_header, 'Date', '') + '\n' \
+ self.__canonicalizedHeaders(request_header) \
+ self.__canonicalizedResource(uri, http_params)
signature_hmac = hmac.new(secret.encode('utf-8'), canonicalized.encode('utf-8'), 'sha1')
signature = base64.b64encode(signature_hmac.digest())
return '%s %s%s%s' %(self.OS_PREFIX, access_key, ':', signature.decode('utf-8'))
def __getHeader(self, header, key, default_value = None):
if key in header and header[key] is not None:
return header[key]
return default_value
def __canonicalizedResource(self, uri, http_params):
canonicalized = urllib.parse.quote(uri).replace('%2F', '/')
sorted_params = sorted(list(http_params.items()), key = lambda http_params : http_params[0])
params = []
for (key, value) in sorted_params:
if value is None or len(value) == 0:
continue
params.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value, safe = ''))
return canonicalized + '?' + '&'.join(params)
def generateDate(self, format = "%Y-%m-%dT%H:%M:%SZ", timestamp = None):
if timestamp is None:
return time.strftime(format, time.gmtime())
else:
return time.strftime(format, timestamp)
def generateNonce(self):
return str(int(time.time()*100)) + str(random.randint(1000, 9999))
def __canonicalizedHeaders(self, request_header):
header = {}
for key, value in request_header.items():
if key is None or value is None:
continue
k = key.strip(' \t')
v = value.strip(' \t')
if k.startswith('X-Opensearch-') and len(v) > 0:
header[k] = v
if len(header) == 0:
return ''
sorted_header = sorted(list(header.items()), key=lambda header: header[0])
canonicalized = ''
for (key, value) in sorted_header:
canonicalized += (key.lower() + ':' + value + '\n')
return canonicalized
# 构建Request Header
def buildRequestHeader(self, uri, access_key, secret, http_params, http_header):
request_header = copy.deepcopy(http_header)
if 'Content-MD5' not in request_header:
request_header['Content-MD5'] = ''
if 'Content-Type' not in request_header:
request_header['Content-Type'] = 'application/json'
if 'Date' not in request_header:
request_header['Date'] = self.generateDate()
if 'X-Opensearch-Nonce' not in request_header:
request_header['X-Opensearch-Nonce'] = self.generateNonce()
if 'Authorization' not in request_header:
request_header['Authorization'] = self.buildAuthorization(uri,
access_key,
secret,
http_params,
request_header)
key_del = []
for key, value in request_header.items():
if value is None:
key_del.append(key)
for key in key_del:
del request_header[key]
return request_header
if __name__ == '__main__':
accesskey_id = '替换为AccessKeyId'
accesskey_secret = '替换为secret'
# 下面host地址,替换为访问对应应用api地址,例如华东1区
internet_host = 'opensearch-cn-hangzhou.aliyuncs.com'
appname = '替换为应用名'
api = V3Api(address = internet_host, port = '80')
#下面为设置查询信息,query参数中可设置对应的查询子句,添加查询参数,参考fetch_fields用法
query_subsentences_params = {
'query': "query=name:'搜索'&&config=start:0,hit:1,format:json&&sort=+id",
'fetch_fields':'id;name'
}
print(api.runQuery(app_name = appname, access_key=accesskey_id, secret=accesskey_secret, http_params=query_subsentences_params, http_header={}))