Use this Python sample to push documents to an OpenSearch application. The sample demonstrates three operations: add, update, and delete.
Prerequisites
Before you begin, ensure that you have:
An OpenSearch application with at least one table
A Resource Access Management (RAM) user with the required permissions. For details, see AliyunServiceRoleForOpenSearch and Access authorization rules
Set up credentials
The sample reads credentials from environment variables to avoid hardcoding sensitive values in source code.
Use a RAM user's AccessKey pair to call API operations rather than the Alibaba Cloud account's AccessKey pair. The account's AccessKey pair grants access to all API operations and must be kept secure.
To create a RAM user, see Create a RAM user.
To create an AccessKey pair, see Create an AccessKey pair.
Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables:
Linux and macOS — replace
<access_key_id>and<access_key_secret>with your RAM user's credentials:export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>Windows — create an environment variable file, add
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETwith the corresponding values, and then restart Windows for the changes to take effect.
Install dependencies
pip install alibabacloud_tea_util
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentialsFor information about BaseRequest, see Demo code for using the client.
Document operations
The bulk push endpoint accepts a list of document objects. Each object has a cmd field that specifies the operation:
cmd value | Behavior |
|---|---|
ADD | Adds a new document. |
UPDATE | Updates an existing document. |
DELETE | Removes the document matching the specified primary key. |
Each document object can include an optional timestamp field (milliseconds since epoch). When multiple operations target the same primary key, OpenSearch uses the values of this field to update documents in the specified order. Without a timestamp, OpenSearch applies operations in the order it receives them.
Sample code
# -*- coding: utf-8 -*-
import time, os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
def __init__(self, config: Config):
self.Clients = Client(config=config)
self.runtime = util_models.RuntimeOptions(
connect_timeout=10000,
read_timeout=10000,
autoretry=False,
ignore_ssl=False,
max_idle_conns=50,
max_attempts=3
)
self.header = {}
def docBulk(self, app_name: str, table_name: str, doc_content: list) -> Dict[str, Any]:
try:
response = self.Clients._request(
method="POST",
pathname=f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk',
query={},
headers=self.header,
body=doc_content,
runtime=self.runtime
)
return response
except Exception as e:
print(e)
if __name__ == "__main__":
# Endpoint of the OpenSearch API — do not include the http:// prefix
endpoint = "<endpoint>"
# Request protocol: HTTP or HTTPS
endpoint_protocol = "HTTP"
# Read credentials from environment variables
access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
# Authentication method:
# access_key — use an AccessKey pair directly (default)
# sts — use RAM and Security Token Service (STS) temporary credentials
auth_type = "sts"
# Required only when auth_type is "sts"
# Call the AssumeRole operation of Alibaba Cloud RAM to obtain an STS token
security_token = "<security_token>"
Configs = Config(
endpoint=endpoint,
access_key_id=access_key_id,
access_key_secret=access_key_secret,
security_token=security_token,
type=auth_type,
protocol=endpoint_protocol
)
ops = opensearch(Configs)
app_name = "app_name"
table_name = "table_name"
# --------------- Push documents ---------------
# ADD with timestamp: controls update order for documents with the same primary key
document1 = {"cmd": "ADD", "timestamp": int(time.time() * 1000), "fields": {"id": "1", "title": "opensearch"}}
# ADD without timestamp: OpenSearch applies the operation in the order it receives it
document2 = {"cmd": "ADD", "fields": {"id": 2, "describe": "123456"}}
# Delete documents.
deletedoc = {"cmd": "DELETE", "fields": {"id": 2}}
# Update documents.
updatedoc = {"cmd": "UPDATE", "fields": {"id": 2, "describe": "6666", "title": "OpenSearch"}}
documents = [document1, document2]
res5 = ops.docBulk(app_name=app_name, table_name=table_name, doc_content=documents)
print(res5)What's next
Process data — learn how to handle and transform data after pushing documents to OpenSearch.