访问Elasticsearch数据源

本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过ENI网络读取Elasticsearch数据源。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建数据库账号。

  • AnalyticDB for MySQL集群与OSS存储空间位于相同地域。

  • AnalyticDB for MySQL集群与阿里云Elasticsearch实例位于同一地域。具体操作,请参见创建阿里云Elasticsearch实例

  • 已将AnalyticDB for MySQL的IP地址添加至阿里云Elasticsearch实例的白名单中。具体操作,请参见配置实例公网或私网访问白名单

准备工作

  1. 阿里云Elasticsearch控制台基本信息页面,获取交换机ID。

  2. ECS管理控制台安全组页面,获取阿里云Elasticsearch实例所属的安全组ID。如未添加安全组,请参见创建安全组

使用Scala连接阿里云Elasticsearch

  1. 下载与阿里云Elasticsearch实例版本对应的JAR包,下载链接,请参见Elasticsearch Spark。本文下载的示例JAR包为Elasticsearch-spark-30_2.12-7.17.9.jar。

  2. 在pom.xml文件的dependencies中添加依赖项。

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    重要

    请确保pom.xml文件中Elasticsearch-spark-30_2.12的版本与阿里云Elasticsearch实例的版本一致,Spark-core_2.12的版本与AnalyticDB for MySQL Spark版本一致。

  3. 编写如下示例程序,并进行编译打包,本文生成的JAR包名称为spark-example.jar

    package org.example
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkEs {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().getOrCreate();
    
        // 生成一个dataframe
        val columns = Seq("language","users_count")
        val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
        val writeDF = spark.createDataFrame(data).toDF(columns:_*)
    
        // 写入数据
        writeDF.write.format("es").mode(SaveMode.Overwrite)
        // 阿里云Elasticsearch实例的私网地址
        .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
        // 阿里云Elasticsearch实例的私网端口号
        .option("es.port", "9200")
        // 阿里云Elasticsearch实例的用户名,固定写为elastic
        .option("es.net.http.auth.user", "elastic")
        // 阿里云Elasticsearch实例的密码
        .option("es.net.http.auth.pass", "password")
        // 连接阿里云Elasticsearch实例时,必须配置为true 
        .option("es.nodes.wan.only", "true")
        // 连接阿里云Elasticsearch实例时,必须配置为false
        .option("es.nodes.discovery", "false")
        // Spark读取的阿里云Elasticsearch实例的数据类型
        .save("spark/_doc")
    
        // 读取数据
        spark.read.format("es")
        // 阿里云Elasticsearch实例的私网地址
        .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
        // 阿里云Elasticsearch实例的私网端口号
        .option("es.port", "9200")
        // 阿里云Elasticsearch实例的用户名,固定写为elastic
        .option("es.net.http.auth.user", "elastic")
        // 阿里云Elasticsearch实例的密码
        .option("es.net.http.auth.pass", "password")
        // 连接阿里云Elasticsearch实例时,必须配置为true 
        .option("es.nodes.wan.only", "true")
        // 连接阿里云Elasticsearch实例时,必须配置为false
        .option("es.nodes.discovery", "false")
        // Spark读取的阿里云Elasticsearch实例的数据类型,格式为<index>/<type>
        .load("spark/_doc").show
      }
    }
  4. 将步骤1中下载的JAR包和示例程序spark-example.jar上传至OSS。具体操作,请参见上传文件

  5. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  6. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  7. 在编辑器窗口上方,选择Job型资源组和Spark应用类型。本文以Batch类型为例。

  8. 在编辑器中执行以下作业内容。

    {
    
        "name": "ES-SPARK-EXAMPLE",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/spark-example.jar",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    参数说明如下:

    参数

    说明

    name

    Spark作业名称。

    className

    Java或者Scala程序入口类,Python不需要指定入口类。

    conf

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

    spark.adb.eni.enabled

    是否开启ENI访问。使用企业版、基础版及湖仓版Spark访问Elasticsearch数据源时,需要开启ENI访问。

    spark.adb.eni.vswitchId

    阿里云Elasticsearch实例的交换机ID。获取方法,请参见准备工作

    spark.adb.eni.securityGroupId

    阿里云Elasticsearch实例的安全组ID。获取方法,请参见准备工作

    file

    示例程序spark-example.jar所在的OSS路径。

    jars

    Spark作业依赖的JAR包所在的OSS路径。

  9. 单击立即执行

使用PySpark连接阿里云Elasticsearch

  1. 下载与阿里云Elasticsearch实例版本对应的JAR包,下载链接,请参见Elasticsearch Spark。本文下载的示例JAR包为Elasticsearch-spark-30_2.12-7.17.9.jar。

  2. 在pom.xml文件的dependencies中添加依赖项。

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    重要

    请确保pom.xml文件中Elasticsearch-spark-30_2.12的版本与阿里云Elasticsearch实例的版本一致,Spark-core_2.12的版本与AnalyticDB for MySQL Spark版本一致。

  3. 编写如下示例程序,并将示例程序存储为es-spark-example.py

    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .getOrCreate()
    
        # 生成DataFrame
        dept = [("Finance", 10),
                ("Marketing", 20),
                ("Sales", 30),
                ("IT", 40)
                ]
        deptColumns = ["dept_name", "dept_id"]
        deptDF = spark.createDataFrame(data=dept, schema=deptColumns)
        deptDF.printSchema()
        deptDF.show(truncate=False)
    
        # 写入数据
        deptDF.write.format('es').mode("overwrite") \
            #阿里云Elasticsearch实例的私网地址
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            #阿里云Elasticsearch实例的私网端口号
            .option('es.port', '9200') \
            #阿里云Elasticsearch实例的用户名,固定写为elastic
            .option('es.net.http.auth.user', 'elastic') \
            #阿里云Elasticsearch实例的密码
            .option('es.net.http.auth.pass', 'password') \
            #连接阿里云Elasticsearch实例时,必须配置为true
            .option("es.nodes.wan.only", "true") \
            #连接阿里云Elasticsearch实例时,必须配置为false
            .option("es.nodes.discovery", "false") \
            #Spark读取的阿里云Elasticsearch实例的数据类型,格式为<index>/<type>
            .save("spark/_doc")
    
        # 读取数据
        df = spark.read.format("es") \
            #阿里云Elasticsearch实例的私网地址
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            #阿里云Elasticsearch实例的私网端口号
            .option('es.port', '9200') \
            #阿里云Elasticsearch实例的用户名,固定写为elastic
            .option('es.net.http.auth.user', 'elastic') \
            #阿里云Elasticsearch实例的密码
            .option('es.net.http.auth.pass', 'password') \
            #连接阿里云Elasticsearch实例时,必须配置为true 
            .option("es.nodes.wan.only", "true") \
            #连接阿里云Elasticsearch实例时,必须配置为false
            .option("es.nodes.discovery", "false") \
            #Spark读取的阿里云Elasticsearch实例的数据类型,格式为<index>/<type>
            .load("spark/_doc").show
                            
  4. 将步骤1中下载的JAR包和es-spark-example.py程序上传到OSS中。具体操作,请参见上传文件

  5. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  6. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  7. 在编辑器窗口上方,选择Job型资源组和Spark应用类型。本文以Batch类型为例。

  8. 在编辑器中执行以下作业内容。

    {
        "name": "ES-SPARK-EXAMPLE",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/es-spark-example.py",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    参数说明如下:

    参数

    说明

    name

    Spark作业的名称。

    conf

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

    spark.adb.eni.enabled

    是否开启ENI访问。使用企业版、基础版及湖仓版Spark访问Elasticsearch数据源时,需要开启ENI访问。

    spark.adb.eni.vswitchId

    阿里云Elasticsearch实例的交换机ID。获取方法,请参见准备工作

    spark.adb.eni.securityGroupId

    阿里云Elasticsearch实例的安全组ID。获取方法,请参见准备工作

    file

    es-spark-example.py程序所在的OSS路径。

    jars

    Spark作业依赖的JAR包所在的OSS路径。

  9. 单击立即执行