push推送Demo

引入依赖

pip install alibabacloud_tea_util 
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

BaseRequest参考:Python client 示例

推送示例

# -*- coding: utf-8 -*-
import time
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header = {}
    def docBulk(self, app_name: str, table_name: str, doc_content: list) -> Dict[str, Any]:
        try:
            response = self.Clients._request(method="POST",
                                             pathname=f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk',query={},headers = self.header,
                                             body=doc_content, runtime=self.runtime)
            return response
        except Exception as e:
            print(e)
if __name__ == "__main__":
    # 配置统一的请求入口 注意:host需要去掉http://
    endpoint = "<endpoint>"
    # 支持 protocol 配置 HTTPS/HTTP
    endpoint_protocol = "HTTP"
    # 用户识别信息
    access_key_id = "<access_key_id>"
    access_key_secret = "<access_key_secret>"
    # 支持 type 配置 sts/access_key 鉴权. 其中 type 默认为 access_key 鉴权. 使用 sts 可配置 RAM-STS 鉴权.
    # 备选参数为:  sts 或者 access_key
    auth_type = "sts"
    # 如果使用 RAM-STS 鉴权, 请配置 security_token, 可使用 阿里云 AssumeRole 获取 相关 STS 鉴权结构.
    security_token = "<security_token>"
    # 配置请求使用的通用信息.
    #注意:security_token和type参数,如果不是子账号需要省略
    Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
                     security_token=security_token, type=auth_type, protocol=endpoint_protocol)
    # 创建 opensearch 实例
    ops = opensearch(Configs)
    app_name = "app_name"
    table_name = "table_name"
    
  # ---------------  文档推送 ---------------
    
    # timestamp 信息 用以增加对 文档操作的保序能力. 系统会用该时间戳来作为同一主键文档更新顺序的判断标准.
    # 在没有该timestamp项时,默认以文档发送到OpenSearch的时间作为文档更新时间进行操作。
    document1 = {"cmd": "ADD", "timestamp": int(time.time() * 1000), "fields": {"id": "1", "title": "opensearch"}}
    document2 = {"cmd": "ADD", "fields": {"id": 2, "describe": "123456"}}
    #删除记录
    deletedoc={"cmd": "DELETE", "fields": {"id": 2}}
    #更新记录
    updatedoc={"cmd": "UPDATE", "fields": {"id": 2, "describe": "6666","title": "开放搜索"}}
    documents = [document1,document2]
    res5 = ops.docBulk(app_name=app_name, table_name=table_name, doc_content=documents)
    print(res5)
说明

参考:数据处理

阿里云首页 智能开放搜索 OpenSearch 相关技术圈