全部产品

ElasticSearch

本文介绍如何使用Databricks 读写ElasticSearch数据。

前提条件

使用Databricks 读写ElasticSearch数据

说明

目前支持Apache Spark 2.x链接ElasticSearch,如果用到ElasticSearch数据源,在创建集群时请选择:

data
  1. DDI集群与ElasticSearch集群的VPC和VSwitch网络打通

    1. 陆ElasticSearch管理控制台Elasticsearch管理控制台

    2. 点击上部选择实例所在region

    3. 点击实例ID进入实例详情页面

    4. 查看基本信息找到对应的VPV和VSwitchdata

    5. 登录到databricks数据洞察集群阿里云Databricks控制台

    6. 选择集群所在region进入集群列表

    7. 点击集群实例进入集群详情页面

    8. 点击详情页面上方数据源页签进入数据源页面点击添加

    9. 选择通用网络,选择对应的VPC和VSwith点击下一步点击确认等待创建成功data

说明

如果没有接入请按照帮助文档访问外部数据源进行添加

2. 使用NoteBook读写ElasticSearch数据代码实现

1)要写入的json数据

{"productName":"大健康天天理财","annual_rate":"3.2200%","describe":"180天定期理财,最低20000起投,收益稳定,可以自助选择消息推送"}
{"productName":"西部通宝","annual_rate":"3.1100%","describe":"90天定投产品,最低10000起投,每天收益到账消息推送"}
{"productName":"安详畜牧产业","annual_rate":"3.3500%","describe":"270天定投产品,最低40000起投,每天收益立即到账消息推送"}
{"productName":"5G设备采购月月盈","annual_rate":"3.1200%","describe":"90天定投产品,最低12000起投,每天收益到账消息推送"}
{"productName":"新能源动力理财","annual_rate":"3.0100%","describe":"30天定投产品推荐,最低8000起投,每天收益会消息推送"}
{"productName":"微贷赚","annual_rate":"2.7500%","describe":"热门短期产品,3天短期,无须任何手续费用,最低500起投,通过短信提示获取收益消息"}

2)读取oss数据源,数据处理,将数据写入到ElasticSearch代码实现

%spark
//读取oss数据
val path="oss://databricks-data-source/datas/test.json"
val data = spark.read 
  .option("header", "true") 
  .option("inferSchema", "true") 
  .json(path)
//数据处理展示  
data.na.drop.show(10)
//写入数据到ES
data.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.http.auth.user", "your es username") //访问es的用户名
  .option("es.net.http.auth.pass", "your es password")
  .option("es.net.ssl","true")
  .option("pushdown"," true")
  .option("es.nodes", "your es url")
  .mode("Overwrite")
  .save("product_info/products")
data
说明

可以使用ElasticSearch可视化工具kibana进行查询,是否已经写入成功

GET /product_info/products/_search
{
  "query": {
    "range": {
      "annual_rate": {
        "gte": "3.0000%",
        "lte": "3.1300%"
      }
    }
  }
}
hit

3)读取ElasticSearch数据

%spark
val reader = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.http.auth.user", "your es username") //访问es的用户名
  .option("es.net.http.auth.pass", "your es password")
  .option("es.net.ssl","true")
  .option("pushdown"," true")
  .option("es.nodes",  "your es url")
reader.load("product_info/products").show()
data