PolarSearch数据集成指南

PolarSearchPolarDB提供的一种分布式搜索服务,支持全文检索与向量检索,其架构基于OpenSearch(兼容Elasticsearch生态)并与PolarDB数据库紧密集成。您可以使用现有的应用程序客户端、数据处理工具和大数据框架,无需修改代码即可无缝连接和操作PolarSearch中的数据。

资源汇总

PolarSearch通过完全兼容OpenSearch/Elasticsearch的生态工具链,实现了与下述资源的无缝集成:

  • 日志处理:支持直接集成Logstash插件。

  • 大数据分析:支持SparkRay的分布式计算。

  • 数据同步:支持与ElasticsearchMaxCompute的双向数据同步。

日志处理

Logstash

您可以使用Logstashinputoutput插件与PolarSearch进行数据交互。

兼容版本

  • Logstash:7.10.2及以上。

  • logstash-output-opensearch:3.0.0及以上。

  • logstash-input-opensearch:3.0.0及以上。

配置示例

  • 输出数据到PolarSearch。

    output {
      opensearch {
        hosts => ["http://<polarsearch>:<port>"]
        index => "logstash-%{+YYYY.MM.dd}"
        user => "<user_name>"
        password => "<passwd>"
        ssl => false
        ssl_verify => false
      }
    }
  • PolarSearch读取数据。

    input {
      opensearch {
        hosts => ["http://<polarsearch>:<port>"]
        index => "my_index"
        query => '{ "query": { "match_all": {} } }'
        user => "<user_name>"
        password => "<passwd>"
        ssl => false
      }
    }

大数据分析

Spark

您可以通过Spark连接器直接读写PolarSearch中的数据。

配置示例

// 添加依赖(Spark 3.0+)
val df = spark.read
  .format("org.opensearch.spark.sql")
  .option("es.nodes", "<polarsearch>")
  .option("es.port", "<port>")
  .option("es.resource", "my_index/_search")
  .load()

df.show()

Ray

您可以结合Ray的分布式处理能力和PolarSearchREST API实现大规模数据操作。

配置示例(Python)

import ray
from opensearchpy import OpenSearch

ray.init()

@ray.remote
def process_data(chunk):
    client = OpenSearch("http://<polarsearch>:<port>")
    # 处理数据并写入 PolarSearch
    client.index(index="my_index", body=chunk)

# 分布式执行任务
futures = [process_data.remote(data_chunk) for data_chunk in data_list]
ray.get(futures)

MaxCompute

您可以通过DataX工具或DataWorks实现PolarSearchMaxCompute之间的数据同步。

DataX配置示例

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "odpsreader",
          "parameter": {
            "accessId": "<your_access_id>",
            "accessKey": "<your_access_key>",
            "project": "<your_project>",
            "table": "<your_maxcompute_table>"
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "hosts": "<polarsearch>:<port>",
            "index": "my_index",
            "type": "_doc",
            "user": "<user_name>",
            "password": "<passwd>"
          }
        }
      }
    ]
  }
}

数据检索与分析

Elasticsearch

您可以使用elasticsearch-dump工具将数据从现有的Elasticsearch集群同步至PolarSearch。

  1. 导出数据到本地。

    elasticdump \
      --input=https://<polarsearch>:<port>/my_index \
      --output=/path/to/export.json \
      --type=data \
      --user <user_name>:<passwd>
  2. 导入数据到PolarSearch。

    elasticdump \
      --input=/path/to/export.json \
      --output=https://<polarsearch>:<port>/my_index \
      --type=data \
      --user <user_name>:<passwd>