本文为您介绍如何通过DataWorks数据同步功能,迁移阿里云Elasticsearch集群上的数据至MaxCompute。

前提条件

  • 开通MaxCompute
  • 开通DataWorks
  • 在DataWorks上完成创建业务流程,本例使用DataWorks简单模式。详情请参见创建业务流程
  • 搭建阿里云Elasticsearch集群

    进行数据迁移前,您需要保证自己的阿里云Elasticsearch集群环境正常。搭建阿里云Elasticsearch集群的详细过程请参见Elasticsearch快速入门

    本示例中阿里云Elasticsearch的具体配置如下:
    • 地域:华东2(上海)
    • 可用区:上海可用区B
    • 版本:5.5.3 with Commercial Feature

背景信息

Elasticsearch是一个基于Lucene的搜索服务器,它提供了一个多用户分布式的全文搜索引擎。Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。

阿里云Elasticsearch提供Elasticsearch 5.5.3 with Commercial Feature、6.3.2 with Commercial Feature、6.7.0 with Commercial Feature及商业插件X-pack服务,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。

操作步骤

  1. 在Elasticsearch上创建源表。详情请参见通过DataWorks将MaxCompute数据同步到阿里云ES
  2. 在MaxCompute上创建目标表。
    1. 登录DataWorks控制台
    2. 右键单击业务流程,选择新建 > MaxCompute >
    3. 新建表页面,选择引擎类型并输入表名
    4. 在表的编辑页面,单击DDL模式
    5. DDL模式对话框,输入如下建表语句,单击生成表结构
      create table elastic2mc_bankdata 
      (
      age             string,
      job             string,
      marital         string,
      education       string,
      default         string,
      housing         string,
      loan            string,
      contact         string,
      month           string,
      day of week     string
      );
    6. 单击提交到生产环境
  3. 同步数据
    1. 进入数据开发页面,右键单击指定业务流程,选择新建 > 数据集成 > 离线同步
    2. 新建节点对话框中,输入节点名称,并单击提交
    3. 在顶部菜单栏上,单击转化脚本图标。
    4. 在脚本模式下,单击顶部菜单栏上的**图标。
    5. 导入模板对话框中选择来源类型数据源目标类型数据源,并单击确定
    6. 配置脚本。
      示例代码如下。代码释义请参见Elasticsearch Reader
      {
          "type": "job",
          "steps": [
              {
                  "stepType": "elasticsearch",
                  "parameter": {
                      "retryCount": 3,
                      "column": [
                          "age",
                          "job",
                          "marital",
                          "education",
                          "default",
                          "housing",
                          "loan",
                          "contact",
                          "month",
                          "day_of_week",
                          "duration",
                          "campaign",
                          "pdays",
                          "previous",
                          "poutcome",
                          "emp_var_rate",
                          "cons_price_idx",
                          "cons_conf_idx",
                          "euribor3m",
                          "nr_employed",
                          "y"
                      ],
                      "scroll": "1m",
                      "index": "es_index",
                      "pageSize": 1,
                      "sort": {
                          "age": "asc"
      },
                      "type": "elasticsearch",
                      "connTimeOut": 1000,
                      "retrySleepTime": 1000,
                      "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200",
                      "password": "xxxx",
                      "search": {
                          "match_all": {}
                      },
                      "readTimeOut": 5000,
                      "username": "xxxx"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                      "partition": "",
                      "truncate": true,
                      "compress": false,
                      "datasource": "odps_first",
                      "column": [
                          "age",
                          "job",
                          "marital",
                          "education",
                          "default",
                          "housing",
                          "loan",
                          "contact",
                          "month",
                          "day_of_week",
                          "duration",
                          "campaign",
                          "pdays",
                          "previous",
                          "poutcome",
                          "emp_var_rate",
                          "cons_price_idx",
                          "cons_conf_idx",
                          "euribor3m",
                          "nr_employed",
                          "y"
                      ],
                      "emptyAsNull": false,
                      "table": "elastic2mc_bankdata"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": "0"
              },
              "speed": {
                  "throttle": false,
                  "concurrent": 1,
                  "dmu": 1
              }
          }
      }
      说明 您可以在创建的阿里云Elasticsearch集群的基本信息中,查看公网地址公网端口信息。
    7. 单击**图标运行代码。
    8. 您可以在运行日志查看运行结果。
  4. 查看结果。
    1. 右键单击业务流程,选择新建 > MaxCompute > ODPS SQL
    2. 新建节点对话框中输入节点名称,并单击提交
    3. 在ODPS SQL节点编辑页面输入如下语句。
      SELECT * FROM elastic2mc_bankdata;
    4. 单击**图标运行代码。
    5. 您可以在运行日志查看运行结果。