python3 pushDemo

# -- coding: utf-8 --
import time
import random
import hmac
import base64
import copy
import urllib.request, urllib.parse, urllib.error
import http.client
import collections
import sys
import hashlib


class V3Api:

    #定义变量
    URI_PREFIX = '/v3/openapi/apps/'
    OS_PREFIX = 'OPENSEARCH'
    VERB = 'POST'
    #定义需推送到的应用表名,下面替换为数据需推送到的应用表名
    TABLE_NAME = 'tab'
    #定义上传数据,将下面待上传数据替换为自己的数据
    body_json = u'[{\"cmd\":\"ADD\",\"fields\":{\"id\":1,\"name\":\"测试推送数据\"}}]'

    def __init__(self, address = '', port = ''):
        self.address = address
        self.port = port

    def runQuery(self,
                 app_name = None,
                 access_key = None,
                 secret = None,
                 http_header = {}, 
                 http_params = {}):
        query, header = self.buildQuery(app_name = app_name,
                                        access_key = access_key,
                                        secret = secret,
                                        http_header = http_header,
                                        http_params = http_params)

        conn = http.client.HTTPConnection(self.address, self.port)
        conn.request(self.VERB, url = query,body = self.body_json.encode('utf-8'), headers = header)
        response = conn.getresponse()

        return response.status, response.getheaders(), response.read()


    def buildQuery(self,
                   app_name = None,
                   access_key = None,
                   secret = None,
                   http_header = {},
                   http_params = {}):
        uri = self.URI_PREFIX
        if app_name is not None:
            uri += app_name
        uri += '/{TABLE_NAME}/actions/bulk'.format(TABLE_NAME=self.TABLE_NAME)

        param = []
        for key, value in http_params.items():
            param.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value))

        query = ('&'.join(param))

        request_header = self.buildRequestHeader(uri = uri,
                                                 access_key = access_key,
                                                 secret = secret,
                                                 http_params = http_params,
                                                 http_header = http_header)

        return uri + query, request_header

    # 此处为签名代码实现
    def buildAuthorization(self, uri, access_key, secret, http_params, request_header):
        canonicalized = self.VERB + '\n'\
                      + self.__getHeader(request_header, 'Content-MD5', '') + '\n' \
                      + self.__getHeader(request_header, 'Content-Type', '') + '\n' \
                      + self.__getHeader(request_header, 'Date', '') + '\n' \
                      + self.__canonicalizedHeaders(request_header) \
                      + self.__canonicalizedResource(uri, http_params)

        signature_hmac = hmac.new(secret.encode('utf-8'), canonicalized.encode('utf-8'), 'sha1')
        signature = base64.b64encode(signature_hmac.digest())

        return '%s %s%s%s' %(self.OS_PREFIX, access_key, ':', signature.decode('utf-8'))


    # 添加不为空的header
    def __getHeader(self, header, key, default_value = None):
        if key in header and header[key] is not None:
            return header[key]
        return default_value

    def __canonicalizedResource(self, uri, http_params):
        canonicalized = urllib.parse.quote(uri).replace('%2F', '/')

        sorted_params = sorted(list(http_params.items()), key = lambda http_params : http_params[0])
        params = []
        for (key, value) in sorted_params:
            if value is None or len(value) == 0:
                continue

            params.append(urllib.parse.quote(key) + '=' + urllib.parse.quote(value))

        return canonicalized + '&'.join(params)


    def generateDate(self, format = "%Y-%m-%dT%H:%M:%SZ", timestamp = None):
        if timestamp is None:
            return time.strftime(format, time.gmtime())
        else:
            return time.strftime(format, timestamp)


    def generateNonce(self):
        return str(int(time.time()*100)) + str(random.randint(1000, 9999))


    def __canonicalizedHeaders(self, request_header):
        header = {}
        for key, value in request_header.items():
            if key is None or value is None:
                continue
            k = key.strip(' \t')
            v = value.strip(' \t')
            if k.startswith('X-Opensearch-') and len(v) > 0:
                header[k] = v

        if len(header) == 0:
            return ''

        sorted_header = sorted(list(header.items()), key=lambda header: header[0])
        canonicalized = ''
        for (key, value) in sorted_header:
            canonicalized += (key.lower() + ':' + value + '\n')

        return canonicalized

    # 构建Request Header
    def buildRequestHeader(self, uri, access_key, secret, http_params, http_header):
        request_header = copy.deepcopy(http_header)
        if not self.body_json.strip():
            request_header['Content-MD5'] = ''
        else:
            if 'Content-MD5' not in request_header:
                request_header['Content-MD5'] = hashlib.md5(self.body_json.encode('utf-8')).hexdigest()
        if 'Content-Type' not in request_header:
            request_header['Content-Type'] = 'application/json'
        if 'Date' not in request_header:
            request_header['Date'] = self.generateDate()
        if 'X-Opensearch-Nonce' not in request_header:
            request_header['X-Opensearch-Nonce'] = self.generateNonce()
        if 'Authorization' not in request_header:
            request_header['Authorization'] = self.buildAuthorization(uri,
                                                                      access_key,
                                                                      secret,
                                                                      http_params,
                                                                      request_header)
        key_del = []
        for key, value in request_header.items():
            if value is None:
                key_del.append(key)

        for key in key_del:
            del request_header[key]

        return request_header


if __name__ == '__main__':
    accesskey_id = '替换为AccessKeyId'
    accesskey_secret = '替换为secret'
    # 下面host地址,替换为访问对应应用api地址,例如华东1区
    internet_host = 'opensearch-cn-hangzhou.aliyuncs.com'
    appname = '替换为应用名'

    api = V3Api(address = internet_host, port = '80')
    print(api.runQuery(app_name = appname, access_key=accesskey_id, secret=accesskey_secret, http_params={}, http_header={}))


阿里云首页 智能开放搜索 OpenSearch 相关技术圈