阿里云上拥有丰富的云存储、云数据库产品。当您需要对这些产品中的数据进行分析和搜索时,可以通过数据集成实现最快5分钟一次的离线数据采集,并同步到阿里云Elasticsearch(简称ES)中。

背景信息

阿里云ES支持导入的阿里云离线数据源包括:
  • 阿里云云数据库(MySQL、PostgreSQL、SQL Server、PPAS、MongoDB、HBase)
  • 阿里云DRDS
  • 阿里云MaxCompute
  • 阿里云OSS
  • 阿里云Tablestore
  • 自建HDFS、Oracle、FTP、DB2,及以上数据库的自建版本
注意 进行数据同步时可能会产生公网流量费用。

概述

完成离线数据导入,您需要完成以下几步操作:

  1. 购买一台可以与专有网络VPC(Virtual Private Cloud)内的阿里云ES交互的阿里云服务器ECS(Elastic Compute Service)。这台ECS将获取数据源数据并执行写阿里云ES数据的任务(该任务将由数据集成服务统一下发)。
  2. 开通数据集成服务,并且将ECS作为一个可以执行任务的资源,注册到数据集成服务中去。
  3. 配置一个数据同步脚本,并且让其周期性地执行起来。

前提条件

  • 请确保您已经创建了目标阿里云ES实例。

    如果还未创建,请先创建阿里云Elasticsearch实例

  • 请确保您已经开启了目标阿里云ES实例的自动创建索引功能。

    如果还未开启,请先开启自动创建索引功能。

  • 请确保您已经购买了一台ECS服务器,需要分配一个公网IP地址或开通弹性公网IP,并且与阿里云ES实例在同一VPC下。
    如果还未购买,请先购买ECS实例
    注意
    • 为了节省成本,您可以复用已有的ECS服务器。建议ECS实例使用CentOS6、CentOS7或者Aliyun Linux系统。
    • 如果您添加的ECS实例需要执行MaxCompute任务或者同步任务,需要确保当前ECS实例的Python版本是python2.6或2.7版本(CentOS5的版本为2.4 ,其余OS自带了2.6及以上版本)。
  • 请确保您已经开通了DataWorks的数据集成服务。

    如果还未开通,请先开通数据集成服务,详情请参见创建工作空间

  • 请确保您已经准备了需要进行数据导入的数据源。

    如果还未准备,请先准备数据源,详情请参见支持的数据源

配置调度资源

  1. 使用主账号登录DataWorks控制台
  2. 工作空间列表页面,单击对应工作空间右侧操作栏下的进入数据集成
    如果您已经开通过数据集成或者DataWorks产品,将会看到如下概览页面。已经购买页面

    如果您未开通过数据集成服务或DataWorks产品,请按照步骤进行开通,此开通动作会产生费用,可按照费用提示进行预算评估。

  3. 数据集成控制台中,单击左侧导航栏中的自定义资源组
  4. 自定义资源组管理页面,单击右上角的新增自定义资源组,将之前VPC内的ECS配置成为一个调度资源。
    注意 新增自定义资源组功能为DataWorks专业版功能,使用前需要先将DataWorks升级至专业版。

    详细配置方法请参见新增自定义资源组

配置数据源

  1. 数据集成控制台中,单击左侧导航栏中的数据源
  2. 数据源管理页面,单击右上角的新增数据源
  3. 新增数据源对话框中,选择数据源类型,并填入数据源信息。
    详情请参见数据源配置
  4. 单击测试连通性
    连通性测试支持的数据源类型,以及数据源连通性测试常见问题请参见数据源测试连通性
  5. 测试连通成功后,单击完成即可完成数据源的配置。

配置并运行数据同步脚本

  1. 数据集成控制台中,单击左侧导航栏中的首页
  2. 单击新建同步任务
    新建同步任务
  3. 新建节点对话框中,填写节点信息。
    配置项 说明
    节点类型 选择离线同步
    节点名称 自定义节点名称。
    目标文件夹 选择业务流程 > 您创建的业务流程名称 > 数据集成文件夹。如果您还未创建过业务流程,需要首先创建一个业务流程,详情请参见新建业务流程
  4. 单击提交
  5. 在当前页面,配置数据同步任务。
    您可以使用向导模式或者脚本模式来配置任务,将数据库作为Reader插件,阿里云ES作为Writer插件,详情请参见向导模式配置脚本模式配置
    本文使用脚本模式进行配置。单击页面中的转换脚本图标,可将向导模式转换成脚本模式,配置详情如下。
    {
        "type": "job",
        "steps": [
            {
                "stepType": "odps",
                "parameter": {
                    "partition": [],
                    "datasource": "odps_first",
                    "column": [],
                    "emptyAsNull": false,
                    "table": ""
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "elasticsearch",
                "parameter": {
                    "accessId": "",
                    "endpoint": "http://es-cn-xxxxx.elasticsearch.aliyuncs.com:9200",
                    "indexType": "",
                    "accessKey": "",
                    "cleanup": true,
                    "discovery": false,
                    "column": [
                        {
                            "name": "",
                            "type": ""
                        }
                    ],
                    "index": "",
                    "batchSize": 1000,
                    "splitter": ","
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        }
    }

    阿里云ES的配置参数详情请参见配置Elasticsearch Writer

  6. 保存配置脚本,单击右侧的调度配置,按照需求填写相应的周期性执行配置。
    调度配置
    注意
    • 在提交任务前,必须配置任务调度依赖的上游节点,详情请参见依赖关系
    • 如果您希望对任务进行周期性调度,需要配置任务的时间属性,包括任务的具体执行时间、调度周期、生效周期等。
    • 周期任务将于配置任务开始的第二天00:00,按照您的配置规则生效执行。
  7. 单击提交图标图标,提交任务。
  8. 单击页面右上角的运维中心,前往周期任务页面,找到您提交的Job,将其调度资源从默认修改为您配置好的调度资源。
    运维中心-周期任务页面

查看结果

  1. 登录目标阿里云ES实例的Kibana控制台。
    登录控制台的具体步骤请参见登录Kibana控制台
  2. 单击左侧导航栏的Dev Tools(开发工具)。
  3. Console中,执行以下命令查询同步的数据。
    GET /<your_index_name>/_search

    <your_index_name>为您在数据同步脚本中配置的ES索引名称。