Elasticsearch数据迁移至MaxCompute

本文为您介绍如何通过DataWorks数据同步功能,迁移阿里云Elasticsearch集群上的数据至MaxCompute。

前提条件

  • 已开通MaxCompute服务。

    开通指导,详情请参见开通MaxCompute和DataWorks

  • 已开通DataWorks服务。

    开通指导,详情请参见开通DataWorks服务

  • 新增MaxCompute数据源。详情请参见创建MaxCompute数据源

  • 在DataWorks上已完成创建业务流程。

    本例使用DataWorks简单模式,详情请参见创建业务流程

  • 已搭建阿里云Elasticsearch集群。

    进行数据迁移前,您需要保证自己的阿里云Elasticsearch集群环境正常。搭建阿里云Elasticsearch集群的详细过程,请参见快速入门

    本示例中阿里云Elasticsearch的具体配置如下:

    • 地域:华东2(上海)

    • 可用区:上海可用区B

    • 版本:5.5.3 with Commercial Feature

背景信息

Elasticsearch是一个基于Lucene的搜索服务器,它提供了一个多用户分布式的全文搜索引擎。Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。

阿里云Elasticsearch提供Elasticsearch 5.5.3 with Commercial Feature、6.3.2 with Commercial Feature、6.7.0 with Commercial Feature及商业插件X-pack服务,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。

操作步骤

  1. 在Elasticsearch上创建源表。详情请参见通过DataWorks将MaxCompute数据同步到阿里云ES

  2. 在MaxCompute上创建目标表。

    1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

    2. 右键单击您所创建的业务流程,选择新建表 > MaxCompute >

    3. 在弹出的新建表对话框中,填写名称,并单击新建

      说明

      如果在数据开发中绑定多个MaxCompute数据源,则按需选择MaxCompute引擎实例

    4. 单击表编辑页面上方的DDL模式。

    5. DDL对话框,输入如下建表语句,单击生成表结构

      create table elastic2mc_bankdata 
      (
      age             string,
      job             string,
      marital         string,
      education       string,
      default         string,
      housing         string,
      loan            string,
      contact         string,
      month           string,
      day of week     string
      );
    6. 单击提交到生产环境

  3. 同步数据。

    1. 进入数据开发页面,右键单击指定业务流程,选择新建节点 > 数据集成 > 离线同步

    2. 新建节点对话框中,输入名称,并单击确认

    3. 在顶部菜单栏上,单击转化脚本图标。

    4. 在脚本模式下,单击顶部菜单栏上的**图标。

    5. 导入模板对话框中选择来源类型数据源目标类型数据源,并单击确定

    6. 配置脚本。

      示例代码如下。代码释义请参见Elasticsearch Reader

      {
       "type": "job",
       "steps": [
       {
       "stepType": "elasticsearch",
       "parameter": {
       "retryCount": 3,
       "column": [
       "age",
       "job",
       "marital",
       "education",
       "default",
       "housing",
       "loan",
       "contact",
       "month",
       "day_of_week",
       "duration",
       "campaign",
       "pdays",
       "previous",
       "poutcome",
       "emp_var_rate",
       "cons_price_idx",
       "cons_conf_idx",
       "euribor3m",
       "nr_employed",
       "y"
       ],
       "scroll": "1m",
       "index": "es_index",
       "pageSize": 1,
       "sort": {
       "age": "asc"
      },
       "type": "elasticsearch",
       "connTimeOut": 1000,
       "retrySleepTime": 1000,
       "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200",
       "password": "xxxx",
       "search": {
       "match_all": {}
       },
       "readTimeOut": 5000,
       "username": "xxxx"
       },
       "name": "Reader",
       "category": "reader"
       },
       {
       "stepType": "odps",
       "parameter": {
       "partition": "",
       "truncate": true,
       "compress": false,
       "datasource": "odps_source",// MaxCompute数据源名称
       "column": [
       "age",
       "job",
       "marital",
       "education",
       "default",
       "housing",
       "loan",
       "contact",
       "month",
       "day_of_week",
       "duration",
       "campaign",
       "pdays",
       "previous",
       "poutcome",
       "emp_var_rate",
       "cons_price_idx",
       "cons_conf_idx",
       "euribor3m",
       "nr_employed",
       "y"
       ],
       "emptyAsNull": false,
       "table": "elastic2mc_bankdata"
       },
       "name": "Writer",
       "category": "writer"
       }
       ],
       "version": "2.0",
       "order": {
       "hops": [
       {
       "from": "Reader",
       "to": "Writer"
       }
       ]
       },
       "setting": {
       "errorLimit": {
       "record": "0"
       },
       "speed": {
       "throttle": false,
       "concurrent": 1,
       "dmu": 1
       }
       }
      }
      说明

      您可以在创建的阿里云Elasticsearch集群的基本信息中,查看公网地址公网端口信息。

    7. 单击**图标运行代码。

    8. 您可以在运行日志查看运行结果。

  4. 查看结果。

    1. 右键单击业务流程,选择新建 > MaxCompute > ODPS SQL

    2. 新建节点对话框中输入节点名称,并单击提交

    3. 在ODPS SQL节点编辑页面输入如下语句。

      SELECT * FROM elastic2mc_bankdata;
    4. 单击**图标运行代码。

    5. 您可以在运行日志查看运行结果。