通过数据集成导入数据至Elasticsearch

本文为您介绍如何通过数据集成导入离线Elasticsearch数据。

前提条件

  1. 准备阿里云账号,并创建账号的访问密钥。详情请参见开通DataWorks服务

  2. 开通MaxCompute,自动产生一个默认的MaxCompute数据源,并使用主账号登录DataWorks。

  3. 创建工作空间,您可以在工作空间中协作完成业务流程,共同维护数据和任务等。详情请参见创建工作空间

    说明

    如果您需要通过子账号创建数据集成任务,请赋予其相应的权限。详情请参见创建RAM用户空间级模块权限管控

  4. 准备好相关的数据源,详情请参见数据源配置

新建离线同步节点

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 展开目标业务流程,右键单击数据集成,选择新建节点 > 离线同步

  3. 新建节点对话框中,输入名称并选择路径

    说明
    • 节点名称的长度不能超过128个字符。

    • 此处的路径为创建的业务流程,具体操作请参见创建周期业务流程

  4. 单击确认

配置离线同步节点

  1. 成功创建离线同步节点后,单击工具栏中的转换脚本

    转换脚本

  2. 单击提示对话框中的确认,即可进入脚本模式进行开发。

  3. 参照下文的脚本模板,根据自身需求进行配置。

    {
    "configuration": {
    "setting": {
      "speed": {
        "concurrent": "1", //作业并发数。
        "mbps": "1" //作业速率上限,此处1mbps = 1MB/s。
      }
    },
    "reader": {
      "parameter": {
        "connection": [
          {
            "table": [
              "es_table" //源端表名。
            ],
            "datasource": "px_mysql_OK" //数据源名,建议和添加的数据源名保持一致。
          }
        ],
        "column": [ //源端表的列名。
          "col_ip",
          "col_double",
          "col_long",
          "col_integer",
          "col_keyword",
          "col_text",
          "col_geo_point",
          "col_date"
        ],
        "where": "", //过滤条件。
      },
      "plugin": "mysql"
    },
    "writer": {
      "parameter": {
        "cleanup": true, //是否在每次导入数据到Elasticsearch时清空原有数据,全量导入或重建索引时,需要设置为true,同步增量时必须为false。
        "accessKey": "nimda", //如果使用了X-PACK插件,需要填写password;如果未使用,则填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,需要填写password。
        "index": "datax_test", // Elasticsearch的索引名称,如果之前没有,插件会自动创建。
        "alias": "test-1-alias", //数据导入完成后写入别名。
        "settings": {
          "index": {
            "number_of_replicas": 0,
            "number_of_shards": 1
          }
        },
        "batchSize": 1000, //每次批量数据的条数。
        "accessId": "default", //如果使用了X-PACK插件,需要填写username;如果未使用,则填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,需要填写username。
        "endpoint": "http://xxx.xxxx.xxx:xxxx", //Elasticsearch的连接地址,可以在控制台查看。
        "splitter": ",", //如果插入数据是array,则使用指定分隔符。
        "indexType": "default", //Elasticsearch中相应索引下的类型名称。
        "aliasMode": "append", //数据导入完成后增加别名的模式,append(增加模式),exclusive(只留这一个)。
        "column": [ //Elasticsearch中的列名,顺序和Reader中的Column顺序一致。
          {
            "name": "col_ip",
            "type": "ip"//文本类型,采用默认分词。
          },
          {
            "name": "col_double",
            "type": "string"
          },
          {
            "name": "col_long",
            "type": "long"
          },
          {
            "name": "col_integer",
            "type": "integer"
          },
          {
            "name": "col_keyword",
            "type": "keyword"
          },
          {
            "name": "col_text",
            "type": "text"
          },
          {
            "name": "col_geo_point",
            "type": "geo_point"
          },
          {
            "name": "col_date",
            "type": "date"
          }
        ],
        "discovery": false//是否自动发现,设置为true。
      },
      "plugin": "elasticsearch"//Writer插件的名称:Elasticsearch Writer,无需修改。
    }
    },
    "type": "job",
    "version": "1.0"
    }
  4. 单击保存图标后,再单击运行图标。

    说明
    • Elasticsearch仅支持以脚本模式导入数据。

    • 保存同步任务后,直接单击运行图标,任务会立刻运行。

      您也可以单击提交图标,提交同步任务至调度系统中,调度系统会按照配置属性在从第2天开始自动定时执行。

后续步骤

如果您需要使用其它类型的数据源配置同步任务,请参见配置Reader配置Writer模块的文档。