阿里云上拥有丰富的云存储、云数据库产品。当您需要对这些产品中的数据进行分析和搜索时,可以通过DataWorks的数据集成服务,实现最快5分钟一次的离线数据采集任务,并同步到阿里云Elasticsearch中。本教程以阿里云RDS MySQL为例。
教程概述
- 准备工作准备MySQL数据源、创建DataWorks工作空间、创建与配置阿里云Elasticsearch实例。
- 步骤一:购买并创建独享资源组购买并创建一个数据集成独享资源组,并为该资源组绑定专有网络和工作空间。独享资源组可以保障数据快速、稳定地传输。
- 步骤二:添加数据源将MySQL和Elasticsearch数据源接入DataWorks的数据集成服务中。
- 步骤三:配置并运行数据同步任务配置一个数据同步的脚本,将数据集成系统同步成功的数据存储到Elasticsearch中。将独享资源组作为一个可以执行任务的资源,注册到DataWorks的数据集成服务中。这个资源组将获取数据源的数据,并执行将数据写入Elasticsearch中的任务(该任务将由数据集成系统统一下发)。
- 步骤四:验证数据同步结果在Kibana控制台中,查看同步成功的数据。
准备工作
- 创建一个数据库。您可以选择使用阿里云的RDS数据库,也可以在本地服务器上自建数据库。本教程以RDS MySQL数据库为例,使用JOIN获取两张表数据,同步数据到阿里云Elasticsearch中,表字段及数据如下所示。具体操作步骤请参见创建RDS MySQL实例。
图 1. 表一 图 2. 表二 - 创建DataWorks工作空间。具体操作步骤请参见创建工作空间。工作空间所在地域需要与RDS MySQL一致。
- 创建阿里云Elasticsearch实例,并开启实例的自动创建索引功能。创建实例时,所选专有网络需要与RDS MySQL保持一致,具体操作步骤请参见创建阿里云Elasticsearch实例和开启自动创建索引。
步骤一:购买并创建独享资源组
- 登录DataWorks控制台。
- 选择相应地域后,在左侧导航栏,单击资源组列表。
- 参见购买独享数据集成资源组,购买独享数据集成资源。注意 购买时,所选地域需要与目标工作空间保持一致。
- 参见新增独享数据集成资源组,创建一个独享数据集成资源。本文使用的配置如下,其中资源组类型选择独享数据集成资源组。
- 单击已创建的独享资源组右侧的专有网络绑定,参见绑定专有网络,为该独享资源组绑定专有网络。独享资源部署在DataWorks托管的专有网络中。DataWorks需要与MySQL和Elasticsearch实例的专有网络连通才能同步数据。而MySQL和Elasticsearch实例在同一专有网络下,因此在绑定专有网络时,选择Elasticsearch实例所在专有网络和交换机即可。
- 单击已创建的独享资源组右侧的修改归属工作空间,参见修改归属工作空间,为该独享资源组绑定目标工作空间。
步骤二:添加数据源
- 进入DataWorks的数据集成页面。
- 在DataWorks控制台的左侧导航栏,单击工作空间列表。
- 找到目标工作空间,单击其右侧操作列下的进入数据集成。
- 在左侧导航栏,单击数据源。
- 在数据源管理页面,单击新增数据源。
- 在新增数据源对话框中,单击MySQL,进入新增MySQL数据源页面,填写数据源信息。数据源类型:本教程以阿里云实例模式为例,您也可以选择连接串模式。各配置项的详细说明请参见配置MySQL数据源。注意 如果您选择的是连接串模式,可以通过RDS MySQL的公网地址配置JDBC URL,但需要将独享资源组的EIP地址添加到MySQL的白名单中,详情请参见设置RDS MySQL白名单和添加独享数据集成资源组的白名单。配置完成后,可与独享资源组进行连通性测试。连通状态显示为可连通时,表示连通成功。
- 单击完成。
- 使用同样的方式添加Elasticsearch数据源。
参数 说明 Endpoint 阿里云Elasticsearch的访问地址,格式为: http://<实例的内网或公网地址>:9200
。实例的内网或公网地址可在基本信息页面获取,详情请参见查看实例的基本信息。注意 如果您使用的是公网地址,需要将独享资源组的EIP地址添加到阿里云Elasticsearch的公网地址访问白名单中,详情请参见配置ES公网或私网访问白名单和添加独享数据集成资源组的白名单。用户名 访问阿里云Elasticsearch实例的用户名,默认为elastic。 密码 对应用户的密码。elastic用户的密码在创建实例时设定,如果忘记可重置,重置密码的注意事项和操作步骤请参见重置实例访问密码。 说明 其他未提及的参数请自定义输入。
步骤三:配置并运行数据同步任务
- 在DataWorks的数据开发页面,新建一个业务流程。
具体操作步骤请参见管理业务流程。
- 展开新建的业务流程,右键单击数据集成,选择新建 > 离线同步。
- 在新建节点对话框中,输入节点名称,单击提交。
- 在页面上方工具栏,单击
图标。
- 确认后,配置数据同步脚本。详细配置说明请参见通过脚本模式配置任务。说明 您可以单击页面上方工具栏中的
图标,导入对应的脚本配置模板,并在模板的基础上进行修改,完成数据同步脚本配置。
以查询两张表的学生信息和考试信息为例,示例脚本如下。{ "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "10" }, "speed": { "concurrent": 1, "throttle": false } }, "steps": [ { "category": "reader", "name": "Reader", "parameter": { "column": [ "id", "name", "sex", "birth", "department", "address" ], "connection": [ { "datasource": "zl_test_rdsmysql", "querysql": [ "SELECT student.id,name,sex,birth,department,address,c_name,grade FROM student JOIN score on student.id=score.stu_id;" ], "table": [ "score" ] } ], "encoding": "UTF-8", "splitPk": "", "where": "" }, "stepType": "mysql" }, { "category": "writer", "name": "Writer", "parameter": { "batchSize": 1000, "cleanup": true, "column": [ { "name": "student_id", "type": "id" }, { "name": "sex", "type": "text" }, { "name": "name", "type": "text" }, { "name": "birth", "type": "integer" }, { "name": "quyu", "type": "text" }, { "name": "address", "type": "text" }, { "name": "cname", "type": "text" }, { "name": "grades", "type": "integer" } ], "datasource": "ES_data_source", "discovery": false, "index": "mysqljoin", "indexType": "_doc", "splitter": "," }, "stepType": "elasticsearch" } ], "type": "job", "version": "2.0" }
同步脚本的配置分为三个部分。配置 说明 setting
用来配置同步中的一些丢包和最大并发等参数。其中 errorLimit
的record
字段值默认为0,请将其修改为大一些的数值,比如10。Reader
用来配置MySQL Reader,使用 querysql
自定义筛选SQL。当您配置querysql
时,MySQL Reader会直接忽略table
、column
、where
和splitPk
条件的配置。datasource
通过querysql
解析出用户名和密码等信息,详情请参见MySQL Reader。Writer
用来配置Elasticsearch Writer,详情请参见Elasticsearch Writer。 index
:索引名称。indexType
:索引类型,7.0及以上版本的Elasticsearch必须使用_doc
。
- 保存配置脚本,单击右侧的调度配置,按照需求配置相应的调度参数。
- 配置执行同步任务所使用的资源组。
- 在脚本配置页面右侧,单击数据集成资源组配置。
- 选择方案为独享数据集成资源组。
- 选择独享数据集成资源组为您创建的独享资源组。
- 提交任务。
- 保存当前配置,单击
图标。
- 在提交新版本对话框中,填入备注。
- 单击确认。
- 保存当前配置,单击
- 单击
图标,运行任务。
任务运行过程中,可查看运行日志。运行成功后,显示如下结果。
步骤四:验证数据同步结果
- 登录目标阿里云Elasticsearch实例的Kibana控制台。具体操作步骤请参见登录Kibana控制台。
- 在左侧导航栏,单击Dev Tools(开发工具)。
- 在Console中,执行如下命令查看同步的数据。
POST /mysqljoin/_search?pretty { "query": { "match_all": {}} }
说明mysqljoin
为您在数据同步脚本中设置的index
字段的值。数据同步成功后,返回如下结果。
在文档使用中是否遇到以下问题
更多建议
匿名提交