Spark是一种通用的大数据计算框架,拥有Hadoop MapReduce所具有的计算优点,能够通过内存缓存数据为大型数据集提供快速的迭代功能。与MapReduce相比,减少了中间数据读取磁盘的过程,进而提高了处理能力。本文介绍如何通过ES-Hadoop实现Hadoop的Spark服务读写阿里云Elasticsearch数据。
准备工作
创建阿里云Elasticsearch实例,并开启自动创建索引功能。
具体操作步骤请参见创建阿里云Elasticsearch实例和配置YML参数。本文以6.7.0版本的实例为例。
重要在生产环境中,建议关闭自动创建索引功能,提前创建好索引和Mapping。由于本文仅用于测试,因此开启了自动创建索引功能。
创建与Elasticsearch实例在同一专有网络下的E-MapReduce(以下简称EMR)实例。
实例配置如下:
产品版本:EMR-3.29.0
必选服务:Spark(2.4.5),其他服务保持默认
具体操作步骤,请参见创建集群。
重要Elasticsearch实例的私网访问白名单默认为0.0.0.0/0,您可在安全配置页面查看,如果未使用默认配置,您还需要在白名单中加入EMR集群的内网IP地址:
请参见查看集群列表与详情,获取EMR集群的内网IP地址。
请参见配置实例公网或私网访问白名单,配置Elasticsearch实例的VPC私网访问白名单。
准备Java环境,要求JDK版本为8.0及以上。
编写并运行Spark任务
准备测试数据。
登录E-MapReduce控制台,获取Master节点的IP地址,并通过SSH登录对应的ECS机器。
具体操作步骤,请参见登录集群。
将测试数据写入文件中。
本文使用的JSON数据示例如下,将该数据保存在http_log.txt文件中。
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"} {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"} {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
执行以下命令,将测试数据上传至EMR Master节点的tmp/hadoop-es文件中。
hadoop fs -put http_log.txt /tmp/hadoop-es
配置pom依赖。
创建Java Maven工程,并将如下的pom依赖添加到Java工程的pom.xml文件中。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.7.0</version> </dependency> </dependencies>
重要请确保pom依赖中版本与云服务对应版本保持一致,例如elasticsearch-spark-20_2.11版本与阿里云Elasticsearch版本一致;spark-core_2.12与HDFS版本一致。
编写示例代码。
写数据
以下示例代码用来将测试数据写入Elasticsearch的company索引中。
import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.spark_project.guava.collect.ImmutableMap; public class SparkWriteEs { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("Es-write"); conf.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com"); conf.set("es.net.http.auth.user", "elastic"); conf.set("es.net.http.auth.pass", "xxxxxx"); conf.set("es.nodes.wan.only", "true"); conf.set("es.nodes.discovery","false"); conf.set("es.input.use.sliced.partitions","false"); SparkSession ss = new SparkSession(new SparkContext(conf)); final AtomicInteger employeesNo = new AtomicInteger(0); //以下的/tmp/hadoop-es/http_log.txt需要替换为您测试数据的路径。 JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("/tmp/hadoop-es/http_log.txt") .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees" employeesNo.getAndAdd(1), row.mkString())); JavaEsSpark.saveToEs(javaRDD, "company/_doc"); } }
读数据
以下示例代码用来读取上一步写入Elasticsearch的数据,并进行打印。
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import java.util.Map; public class ReadES { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[*]") .set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com") .set("es.port", "9200") .set("es.net.http.auth.user", "elastic") .set("es.net.http.auth.pass", "xxxxxx") .set("es.nodes.wan.only", "true") .set("es.nodes.discovery","false") .set("es.input.use.sliced.partitions","false") .set("es.resource", "company/_doc") .set("es.scroll.size","500"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc); for ( Map<String, Object> item : rdd.values().collect()) { System.out.println(item); } sc.stop(); } }
表 1. 参数说明 参数
默认值
说明
es.nodes
localhost
指定阿里云Elasticsearch实例的访问地址,建议使用私网地址,可在实例的基本信息页面查看。更多信息,请参见查看实例的基本信息。
es.port
9200
Elasticsearch实例的访问端口号。
es.net.http.auth.user
elastic
Elasticsearch实例的访问用户名。
说明如果程序中指定elastic账号访问Elasticsearch服务,后续在修改elastic账号对应密码后需要一些时间来生效,在密码生效期间会影响服务访问,因此不建议通过elastic来访问。建议在Kibana控制台中创建一个符合预期的Role角色用户进行访问,详情请参见通过Elasticsearch X-Pack角色管理实现用户权限管控。
es.net.http.auth.pass
/
对应用户的密码,在创建实例时指定。如果忘记可进行重置,具体操作步骤,请参见重置实例访问密码。
es.nodes.wan.only
false
开启Elasticsearch集群在云上使用虚拟IP进行连接,是否进行节点嗅探:
true:设置
false:不设置
es.nodes.discovery
true
是否禁用节点发现:
true:禁用
false:不禁用
重要使用阿里云Elasticsearch,必须将此参数设置为false。
es.input.use.sliced.partitions
true
是否使用slice分区:
true:使用。设置为true,可能会导致索引在预读阶段的时间明显变长,有时会远远超出查询数据所耗费的时间。建议设置为false,以提高查询效率。
false:不使用。
es.index.auto.create
true
通过Hadoop组件向Elasticsearch集群写入数据,是否自动创建不存在的index:
true:自动创建
false:不会自动创建
es.resource
/
指定要读写的index和type。
es.mapping.names
/
表字段与Elasticsearch的索引字段名映射。
更多的ES-Hadoop配置项说明,请参见官方配置说明。
将代码打成Jar包,上传至EMR客户端机器(例如Gateway或EMR集群主节点)。
在EMR客户端机器上,运行如下命令执行Spark程序:
写数据
cd /usr/lib/spark-current ./bin/spark-submit --master yarn --executor-cores 1 --class "SparkWriteEs" /usr/local/spark_es.jar
重要/usr/local/spark_es.jar需要替换为您Jar包上传的路径。
读数据
cd /usr/lib/spark-current ./bin/spark-submit --master yarn --executor-cores 1 --class "ReadES" /usr/local/spark_es.jar
读数据成功后,打印结果如下。
验证结果
登录对应阿里云Elasticsearch实例的Kibana控制台。
具体操作步骤请参见登录Kibana控制台。
在左侧导航栏,单击Dev Tools。
在Console中,执行以下命令,查看通过Spark任务写入的数据。
GET company/_search { "query": { "match_all": {} } }
查询成功后,返回结果如下。
总结
本文以阿里云Elasticsearch和EMR为例,介绍了如何通过ES-Hadoop,实现Spark读写阿里云Elasticsearch数据。与其他EMR组件相比,ES-Hadoop与Spark的集成,不仅包括RDD,还包括Spark Streaming、scale、DataSet与Spark SQL等,您可以根据需求进行配置。详细信息,请参见Apache Spark support。