访问Elasticsearch数据源
本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过ENI网络读取Elasticsearch数据源。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
AnalyticDB for MySQL集群与OSS存储空间位于相同地域。
AnalyticDB for MySQL集群与阿里云Elasticsearch实例位于同一地域。具体操作,请参见创建阿里云Elasticsearch实例。
已将AnalyticDB for MySQL的IP地址添加至阿里云Elasticsearch实例的白名单中。具体操作,请参见配置实例公网或私网访问白名单。
准备工作
在阿里云Elasticsearch控制台的基本信息页面,获取交换机ID。
在ECS管理控制台的安全组页面,获取阿里云Elasticsearch实例所属的安全组ID。如未添加安全组,请参见创建安全组。
使用Scala连接阿里云Elasticsearch
下载与阿里云Elasticsearch实例版本对应的JAR包,下载链接,请参见Elasticsearch Spark。本文下载的示例JAR包为Elasticsearch-spark-30_2.12-7.17.9.jar。
在pom.xml文件的dependencies中添加依赖项。
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
重要请确保pom.xml文件中Elasticsearch-spark-30_2.12的版本与阿里云Elasticsearch实例的版本一致,Spark-core_2.12的版本与AnalyticDB for MySQL Spark版本一致。
编写如下示例程序,并进行编译打包,本文生成的JAR包名称为
spark-example.jar
。package org.example import org.apache.spark.sql.{SaveMode, SparkSession} object SparkEs { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate(); // 生成一个dataframe val columns = Seq("language","users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000")) val writeDF = spark.createDataFrame(data).toDF(columns:_*) // 写入数据 writeDF.write.format("es").mode(SaveMode.Overwrite) // 阿里云Elasticsearch实例的私网地址 .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // 阿里云Elasticsearch实例的私网端口号 .option("es.port", "9200") // 阿里云Elasticsearch实例的用户名,固定写为elastic .option("es.net.http.auth.user", "elastic") // 阿里云Elasticsearch实例的密码 .option("es.net.http.auth.pass", "password") // 连接阿里云Elasticsearch实例时,必须配置为true .option("es.nodes.wan.only", "true") // 连接阿里云Elasticsearch实例时,必须配置为false .option("es.nodes.discovery", "false") // Spark读取的阿里云Elasticsearch实例的数据类型 .save("spark/_doc") // 读取数据 spark.read.format("es") // 阿里云Elasticsearch实例的私网地址 .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // 阿里云Elasticsearch实例的私网端口号 .option("es.port", "9200") // 阿里云Elasticsearch实例的用户名,固定写为elastic .option("es.net.http.auth.user", "elastic") // 阿里云Elasticsearch实例的密码 .option("es.net.http.auth.pass", "password") // 连接阿里云Elasticsearch实例时,必须配置为true .option("es.nodes.wan.only", "true") // 连接阿里云Elasticsearch实例时,必须配置为false .option("es.nodes.discovery", "false") // Spark读取的阿里云Elasticsearch实例的数据类型,格式为<index>/<type> .load("spark/_doc").show } }
将步骤1中下载的JAR包和示例程序
spark-example.jar
上传至OSS。具体操作,请参见上传文件。登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和Spark应用类型。本文以Batch类型为例。
在编辑器中执行以下作业内容。
{ "name": "ES-SPARK-EXAMPLE", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/spark-example.jar", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }
参数说明如下:
参数
说明
name
Spark作业名称。
className
Java或者Scala程序入口类,Python不需要指定入口类。
conf
与开源Spark中的配置项基本一致,参数格式为
key:value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。spark.adb.eni.enabled
是否开启ENI访问。使用企业版、基础版及湖仓版Spark访问Elasticsearch数据源时,需要开启ENI访问。
spark.adb.eni.vswitchId
阿里云Elasticsearch实例的交换机ID。获取方法,请参见准备工作。
spark.adb.eni.securityGroupId
阿里云Elasticsearch实例的安全组ID。获取方法,请参见准备工作。
file
示例程序
spark-example.jar
所在的OSS路径。jars
Spark作业依赖的JAR包所在的OSS路径。
单击立即执行。
使用PySpark连接阿里云Elasticsearch
下载与阿里云Elasticsearch实例版本对应的JAR包,下载链接,请参见Elasticsearch Spark。本文下载的示例JAR包为Elasticsearch-spark-30_2.12-7.17.9.jar。
在pom.xml文件的dependencies中添加依赖项。
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
重要请确保pom.xml文件中Elasticsearch-spark-30_2.12的版本与阿里云Elasticsearch实例的版本一致,Spark-core_2.12的版本与AnalyticDB for MySQL Spark版本一致。
编写如下示例程序,并将示例程序存储为
es-spark-example.py
。from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession \ .builder \ .getOrCreate() # 生成DataFrame dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40) ] deptColumns = ["dept_name", "dept_id"] deptDF = spark.createDataFrame(data=dept, schema=deptColumns) deptDF.printSchema() deptDF.show(truncate=False) # 写入数据 deptDF.write.format('es').mode("overwrite") \ #阿里云Elasticsearch实例的私网地址 .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ #阿里云Elasticsearch实例的私网端口号 .option('es.port', '9200') \ #阿里云Elasticsearch实例的用户名,固定写为elastic .option('es.net.http.auth.user', 'elastic') \ #阿里云Elasticsearch实例的密码 .option('es.net.http.auth.pass', 'password') \ #连接阿里云Elasticsearch实例时,必须配置为true .option("es.nodes.wan.only", "true") \ #连接阿里云Elasticsearch实例时,必须配置为false .option("es.nodes.discovery", "false") \ #Spark读取的阿里云Elasticsearch实例的数据类型,格式为<index>/<type> .save("spark/_doc") # 读取数据 df = spark.read.format("es") \ #阿里云Elasticsearch实例的私网地址 .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ #阿里云Elasticsearch实例的私网端口号 .option('es.port', '9200') \ #阿里云Elasticsearch实例的用户名,固定写为elastic .option('es.net.http.auth.user', 'elastic') \ #阿里云Elasticsearch实例的密码 .option('es.net.http.auth.pass', 'password') \ #连接阿里云Elasticsearch实例时,必须配置为true .option("es.nodes.wan.only", "true") \ #连接阿里云Elasticsearch实例时,必须配置为false .option("es.nodes.discovery", "false") \ #Spark读取的阿里云Elasticsearch实例的数据类型,格式为<index>/<type> .load("spark/_doc").show
将步骤1中下载的JAR包和
es-spark-example.py
程序上传到OSS中。具体操作,请参见上传文件。登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在编辑器窗口上方,选择Job型资源组和Spark应用类型。本文以Batch类型为例。
在编辑器中执行以下作业内容。
{ "name": "ES-SPARK-EXAMPLE", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/es-spark-example.py", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }
参数说明如下:
参数
说明
name
Spark作业的名称。
conf
与开源Spark中的配置项基本一致,参数格式为
key:value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。spark.adb.eni.enabled
是否开启ENI访问。使用企业版、基础版及湖仓版Spark访问Elasticsearch数据源时,需要开启ENI访问。
spark.adb.eni.vswitchId
阿里云Elasticsearch实例的交换机ID。获取方法,请参见准备工作。
spark.adb.eni.securityGroupId
阿里云Elasticsearch实例的安全组ID。获取方法,请参见准备工作。
file
es-spark-example.py
程序所在的OSS路径。jars
Spark作业依赖的JAR包所在的OSS路径。
单击立即执行。