本文为您介绍如何通过DataWorks数据同步功能,迁移阿里云Elasticsearch集群上的数据至MaxCompute。
前提条件
已开通MaxCompute服务。
开通指导,详情请参见开通MaxCompute和DataWorks。
已开通DataWorks服务。
开通指导,详情请参见开通DataWorks服务。
新增MaxCompute数据源。详情请参见创建MaxCompute数据源。
在DataWorks上已完成创建业务流程。
本例使用DataWorks简单模式,详情请参见创建业务流程。
已搭建阿里云Elasticsearch集群。
进行数据迁移前,您需要保证自己的阿里云Elasticsearch集群环境正常。搭建阿里云Elasticsearch集群的详细过程,请参见快速入门。
本示例中阿里云Elasticsearch的具体配置如下:
地域:华东2(上海)
可用区:上海可用区B
版本:5.5.3 with Commercial Feature
背景信息
Elasticsearch是一个基于Lucene的搜索服务器,它提供了一个多用户分布式的全文搜索引擎。Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。
阿里云Elasticsearch提供Elasticsearch 5.5.3 with Commercial Feature、6.3.2 with Commercial Feature、6.7.0 with Commercial Feature及商业插件X-pack服务,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。
操作步骤
在Elasticsearch上创建源表。详情请参见通过DataWorks将MaxCompute数据同步到阿里云ES。
在MaxCompute上创建目标表。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据开发。
右键单击您所创建的业务流程,选择。
在弹出的新建表对话框中,填写名称,并单击新建。
说明如果在数据开发中绑定多个MaxCompute数据源,则按需选择MaxCompute引擎实例。
单击表编辑页面上方的DDL模式。
在DDL对话框,输入如下建表语句,单击生成表结构。
create table elastic2mc_bankdata ( age string, job string, marital string, education string, default string, housing string, loan string, contact string, month string, day of week string );
单击提交到生产环境。
同步数据。
进入数据开发页面,右键单击指定业务流程,选择 。
在新建节点对话框中,输入名称,并单击确认。
在顶部菜单栏上,单击图标。
在脚本模式下,单击顶部菜单栏上的图标。
在导入模板对话框中选择来源类型、数据源、目标类型及数据源,并单击确定。
配置脚本。
示例代码如下。代码释义请参见Elasticsearch Reader。
{ "type": "job", "steps": [ { "stepType": "elasticsearch", "parameter": { "retryCount": 3, "column": [ "age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y" ], "scroll": "1m", "index": "es_index", "pageSize": 1, "sort": { "age": "asc" }, "type": "elasticsearch", "connTimeOut": 1000, "retrySleepTime": 1000, "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200", "password": "xxxx", "search": { "match_all": {} }, "readTimeOut": 5000, "username": "xxxx" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "compress": false, "datasource": "odps_source",// MaxCompute数据源名称 "column": [ "age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y" ], "emptyAsNull": false, "table": "elastic2mc_bankdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "0" }, "speed": { "throttle": false, "concurrent": 1, "dmu": 1 } } }
说明您可以在创建的阿里云Elasticsearch集群的基本信息中,查看公网地址和公网端口信息。
单击图标运行代码。
您可以在运行日志查看运行结果。
查看结果。
右键单击业务流程,选择 。
在新建节点对话框中输入节点名称,并单击提交。
在ODPS SQL节点编辑页面输入如下语句。
SELECT * FROM elastic2mc_bankdata;
单击图标运行代码。
您可以在运行日志查看运行结果。