Demo code for pushing documents

更新时间:
复制 MD 格式

Use this Python sample to push documents to an OpenSearch application. The sample demonstrates three operations: add, update, and delete.

Prerequisites

Before you begin, ensure that you have:

Set up credentials

The sample reads credentials from environment variables to avoid hardcoding sensitive values in source code.

Important

Use a RAM user's AccessKey pair to call API operations rather than the Alibaba Cloud account's AccessKey pair. The account's AccessKey pair grants access to all API operations and must be kept secure.

Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables:

  • Linux and macOS — replace <access_key_id> and <access_key_secret> with your RAM user's credentials:

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows — create an environment variable file, add ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET with the corresponding values, and then restart Windows for the changes to take effect.

Install dependencies

pip install alibabacloud_tea_util
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

For information about BaseRequest, see Demo code for using the client.

Document operations

The bulk push endpoint accepts a list of document objects. Each object has a cmd field that specifies the operation:

cmd valueBehavior
ADDAdds a new document.
UPDATEUpdates an existing document.
DELETERemoves the document matching the specified primary key.

Each document object can include an optional timestamp field (milliseconds since epoch). When multiple operations target the same primary key, OpenSearch uses the values of this field to update documents in the specified order. Without a timestamp, OpenSearch applies operations in the order it receives them.

Sample code

# -*- coding: utf-8 -*-
import time, os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client

class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header = {}

    def docBulk(self, app_name: str, table_name: str, doc_content: list) -> Dict[str, Any]:
        try:
            response = self.Clients._request(
                method="POST",
                pathname=f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk',
                query={},
                headers=self.header,
                body=doc_content,
                runtime=self.runtime
            )
            return response
        except Exception as e:
            print(e)

if __name__ == "__main__":
    # Endpoint of the OpenSearch API — do not include the http:// prefix
    endpoint = "<endpoint>"

    # Request protocol: HTTP or HTTPS
    endpoint_protocol = "HTTP"

    # Read credentials from environment variables
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    # Authentication method:
    #   access_key — use an AccessKey pair directly (default)
    #   sts        — use RAM and Security Token Service (STS) temporary credentials
    auth_type = "sts"

    # Required only when auth_type is "sts"
    # Call the AssumeRole operation of Alibaba Cloud RAM to obtain an STS token
    security_token = "<security_token>"

    Configs = Config(
        endpoint=endpoint,
        access_key_id=access_key_id,
        access_key_secret=access_key_secret,
        security_token=security_token,
        type=auth_type,
        protocol=endpoint_protocol
    )

    ops = opensearch(Configs)
    app_name = "app_name"
    table_name = "table_name"

    # --------------- Push documents ---------------

    # ADD with timestamp: controls update order for documents with the same primary key
    document1 = {"cmd": "ADD", "timestamp": int(time.time() * 1000), "fields": {"id": "1", "title": "opensearch"}}

    # ADD without timestamp: OpenSearch applies the operation in the order it receives it
    document2 = {"cmd": "ADD", "fields": {"id": 2, "describe": "123456"}}

    # Delete documents.
    deletedoc = {"cmd": "DELETE", "fields": {"id": 2}}

    # Update documents.
    updatedoc = {"cmd": "UPDATE", "fields": {"id": 2, "describe": "6666", "title": "OpenSearch"}}

    documents = [document1, document2]
    res5 = ops.docBulk(app_name=app_name, table_name=table_name, doc_content=documents)
    print(res5)

What's next

  • Process data — learn how to handle and transform data after pushing documents to OpenSearch.