阿里云上拥有丰富的云存储、云数据库产品。如果您希望针对这些产品中的数据进行分析和搜索,可以通过DataWorks的数据集成服务,将离线数据同步到Elasticsearch(简称ES)中,最快可达到5分钟一次。

注意 进行数据同步时可能会产生公网流量费用,请您知晓。

概述

完成离线数据的分析与搜索,您需要完成以下几步操作:

  1. 创建一个数据库,您可以选择使用阿里云的RDS数据库,也可以在本地服务器上自建数据库。本文档以RDS MySQL数据库为例,使用JOIN获取两张表数据,同步数据到ES中,表字段及数据如下所示。
    图 1. 表一
    表一
    图 2. 表二
    表二
  2. 购买一台可以与VPC内的ES交互的ECS,这台ECS将获取数据源数据并执行写ES数据的任务(该任务将由数据集成系统统一下发)。
  3. 开通DataWorks的数据集成服务,并且将ECS作为一个可以执行任务的资源,注册到数据集成服务中去。
  4. 配置一个数据同步的脚本,并且让其可以周期性的执行起来。
  5. 创建一个ES实例,用来存储数据集成系统同步成功的数据。

准备工作

  1. 创建专有网络和交换机
  2. 创建阿里云Elasticsearch实例
    注意 地域专有网络虚拟交换机与您第一步中创建的专有网络保持一致。
    购买ES实例
  3. 购买一台与阿里云ES实例处于同一个VPC内的ECS服务器,并分配一个公网IP或开通弹性IP。
    说明
    • 为了节省您的成本,您可以复用已有的ECS服务器。
    • 建议使用CentOS6、CentOS7或者AliyunOS。
    • 如果您添加的ECS需要执行MaxCompute任务或者同步任务,需要检查当前ECS的Python版本是否是Python2.6或2.7的版本(CentOS5的版本为2.4 ,其余OS自带了2.6以上版本)。
    • 请确保ECS分配了公网IP。

配置资源组和数据源

  1. 登录DataWorks控制台,进入工作空间列表页面。
    • 如果您已经开通过DataWorks数据集成产品,您将会看到如下页面。

      已开通DataWorks数据集成
    • 如果您未开通过DataWorks数据集成产品,您将会看到如下页面。您需要按照步骤开通数据集成服务,此开通动作会产生费用,请您按照费用提示进行预算评估。

      未开通DataWorks数据集成
  2. 选择DataWorks工作空间,单击进入数据集成
  3. 数据集成页面,选择左侧导航栏中的资源组,单击新增自定义资源组
  4. 配置资源组。
    1. 添加服务器

      根据页面提示,输入资源组名称和服务器信息。此服务器为您已经购买的ECS服务器,服务器信息说明如下。

      新增资源组
      名称 描述
      ECS UUID 连接ECS实例,执行 dmidecode | grep UUID,取返回值。
      机器 IP/机器CPU(核)/机器内存(GB) 您ECS实例的公网IP/CPU/内存。您可以在ECS控制台上单击实例名称,在配置信息模块,找到相关信息。
    2. 安装Agent

      按照页面提示,完成安装Agent。其中第五步为开通服务器的8000端口,可以跳过,保持系统默认即可。

    3. 检查连通
  5. 配置MySQL数据库和ES白名单。

    添加该资源组的IP地址到您的数据库/ES实例白名单中,确保网络连通。

  6. 选择左侧导航栏的数据源,单击新增数据源
  7. 单击MySQL,进入新增MySQL数据源页面,填写数据源信息。填写数据源信息

    数据源类型:本文档以阿里云数据库(RDS)为例,您也可以选择有公网IP无公网IP。各配置项的详细信息请参见配置MySQL数据源

    注意 如果测试连通不成功,请检查是否配置了数据库白名单。

配置同步任务

  1. 以开发者身份进入DataWorks控制台
  2. 工作空间列表页面,单击右侧操作栏下的进入数据开发
  3. 数据开发页面,单击新建 > 业务流程新建图标
  4. 新建业务流程对话框中,输入业务名称描述,单击新建
  5. 展开业务流程,右键单击数据集成,选择新建数据集成节点 > 数据同步
  6. 新建节点对话框中,输入节点名称,单击提交
  7. 在业务配置页面,单击工具栏中的转换脚本图标(转换脚本图标)。
  8. 确认后,配置数据同步脚本。

    具体配置请参见脚本模式配置

    以查询两张表的学生信息和考试信息为例,示例脚本如下:
    {
       "type": "job",
       "steps": [
           {
               "stepType": "mysql",
               "parameter": {
                   "column": [
                       "id",
                       "name",
                       "sex",
                       "birth",
                       "department",
                       "address"
                   ],
                   "connection": [
                       {
                           "querysql":["SELECT student.id,name,sex,birth,department,address,c_name,grade FROM student JOIN score on student.id=score.stu_id;"],
                           "datasource": "zl_****_rdsmysql",
                           "table": [
                               "score"
                           ]
                       }
                   ],
                   "where": "",
                   "splitPk": "",
                   "encoding": "UTF-8"
               },
               "name": "Reader",
               "category": "reader"
           },
           {
               "stepType": "elasticsearch",
               "parameter": {
                   "accessId": "elastic",
                   "endpoint": "http://es-cn-0p*********2dpxtx.elasticsearch.aliyuncs.com:9200",
                   "indexType": "score",
                   "accessKey": "******",
                   "cleanup": true,
                   "discovery": false,
                   "column": [
                       {
                           "name":"student_id",
                           "type":"id"
                       },
                        {
                           "name": "sex",
                           "type": "text"
                       },
                       {
                           "name": "name",
                           "type": "text"
                       },
                       {
                           "name": "birth",
                           "type": "integer"
                       },
                       {
                           "name": "quyu",
                           "type": "text"
                       },
                       {
                           "name": "address",
                           "type": "text"
                       },
                       {
                           "name": "cname",
                           "type": "text"
                       },
                       {
                           "name": "grades",
                           "type": "integer"
                       }
                   ],
                   "index": "mysqljoin",
                   "batchSize": 1000,
                   "splitter": ","
               },
               "name": "Writer",
               "category": "writer"
           }
       ],
       "version": "2.0",
       "order": {
           "hops": [
               {
                   "from": "Reader",
                   "to": "Writer"
               }
           ]
       },
       "setting": {
           "jvmOption": "-Xms1024m -Xmx1024m",
           "errorLimit": {
               "record": ""
           },
           "speed": {
               "throttle": false,
               "concurrent": 1
           }
       }
    }

    同步脚本的配置分为三个部分:

    • Reader:用来配置MySQL Reader,使用querysql自定义筛选SQL。当您配置querysql时,MySQL Reader直接忽略tablecolumnwheresplitPk条件的配置。即querysql优先级大于tablecolumnwheresplitPk选项。datasource通过querysql解析出用户名和密码等信息。
    • Writer:用来配置Elasticsearch Writer
      • endpoint:ES的内网或外网地址。需要在ES的安全配置页面,配置ES的私网/公网访问白名单。
      • accessId/accessKey:ES的访问用户名(默认为elastic)和密码。
      • index:ES实例的索引,您需要使用该索引名称访问ES数据。
      • 由于Reader声明从源中读字段,依次放入数组中;Writer从数组中取出数据,依次写入目标的属性中,因此ReaderWriter属性必须按照顺序对应上。
    • setting:用来配置同步中的一些丢包和最大并发等。
  9. 同步脚本配置完成后,单击页面右侧的配置任务资源组,选择您配置的资源组名称,单击运行,将MySQL中的数据同步到ES中。选择资源组

验证

  1. 进入ES实例的Kibana控制台
  2. 单击左侧导航栏的Dev Tools
  3. Console中,执行以下命令查看同步的数据。
    POST /mysqljoin/_search?pretty
    {
    "query": { "match_all": {}}
    }

    mysqljoin为您同步数据时,设置的index字段的值。

    查看同步的数据

常见问题

  • 同步过程中出现无法连接数据库的相关错误。

    解决方法:将您资源组中所使用的ECS服务器的内网IP和外网IP,都添加到您数据库的白名单中。

  • 同步过程中无法连通ES实例的相关错误。

    解决方法:按照下面步骤进行排查。

    1. 检查在运行同步脚本之前,是否在页面右侧的配置任务资源组中选择了您前面步骤创建的资源组。
      • 是,执行下一步。
      • 否,单击页面右侧的配置任务资源组,选择您前面步骤创建的资源组。完成后单击运行
    2. 检查是否在ES实例的白名单中,添加了资源组的IP地址。
      • 是,执行下一步。
      • 否,将您所使用的资源组的IP地址,添加到ES实例的白名单中。
        注意 如果您使用的是内网地址,请在ES的安全配置页面,配置ES的系统白名单。如果您是用的是外网地址,请在ES的安全配置页面,配置ES的公网地址访问白名单(资源组的IP地址)。
    3. 检查您的同步脚本配置是否正确。包括endpoint(ES实例的内网或外网地址)、accessId(ES实例的访问用户名,默认为elastic)和accessKey(ES实例的访问密码)。