数据集成(Data Integration)产品提供数据同步服务,有脚本模式和向导模式两种方式。脚本模式更灵活,向导模式更简单。

本节介绍如何将Table Store中的全量数据通过数据集成的脚本模式导出到Elasticsearch中。

步骤1. 创建Table Store数据源

说明
  • 如果已经创建了Table Store的数据源,可以跳过此步骤。
  • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建导出任务

  1. 登录数据集成控制台
  2. 同步任务页面,选择脚本模式
  3. 在弹出的导入模板对话框中,来源类型选择OTS,目标类型选择Elasticsearch。
  4. 单击确认,进入配置页面。

步骤3. 完善配置项

  1. 在配置界面,已经提前嵌入了OTSReader和ElasticsearchWriter的模板,请参考以下注释完成配置。
    {
    "type": "job",
    "version": "1.0",
    "configuration": {
     "setting": {
       "errorLimit": {
         "record": "0"  # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
       },
       "speed": {
         "mbps": "1",  # 每次同步任务的最大流量。
         "concurrent": "1" # 每次同步任务的并发度。
       }
     },
     "reader": {
       "plugin": "otsstream",  # Reader插件的名称。
       "parameter": {
         "datasource": "",  # TableStore数据源。
         "dataTable": "",  # TableStore中的表名。
         "column": [  # 配置要导出到Elasticsearch中的列名。
             {
                 "name": ""
             },
             {
                  "name": ""
               },
         ],
         "range": {  # 配置本次全量导出的数据范围。支持全部数据导出和部分数据导出。
           "split": [],  # 性能优化项,可以暂时不考虑,下一版本会自动处理。
           "begin": [  # 待导出数据的起始位置,begin中配置项个数和表的主键列个数一致。
             {
               "type": "INF_MIN"  # 如果是从最小的数据开始导出,则这里设置为INF_MIN。
             }
           ],
           "end": [  # 待导出数据的结束位置,end中配置项的个数需要和表的主键列个数保持一致。
             {
               "type": "INF_MIN"  # 如果是一直导出到最大数据结束,则这里设置为INF_MAX。
             }
           ]
         }
       }
     },
     "writer": {
       "plugin": "elasticsearch",  # Writer插件的名称:ElasticSearchWriter,不需要修改。
       "parameter": {
         "endpoint": "",# ElasticSearch的endpoint,控制台上有。
         "accessId": "",# 如果使用了X-PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写username。
         "accessKey": "", # 如果使用了X-PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写password。  
         "index": "",  # ElasticSearch的索引名称,如果之前没有,插件会自动创建。
         "indexType": "", # ElasticSearch中相应索引下的类型名称
         "cleanup": true, # 是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false。
         "discovery": false, # 是否自动发现,设置为true
         "batchSize": 1000, # 每批导出的个数
         "splitter": ",",  # 如果插入数据是array,就使用指定分隔符。
         "column": [  # ElasticSearch中的列名,顺序和Reader中的Column顺序一致
           {
             "name": "uid",  # TableStore中的主键列是uid,这里也有同名uid,用type:id表示这一列是主键
             "type": "id"  # id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
           },
           {
             "name": "name", # 对应于TableStore中的属性列:name
             "type": "text"  # 文本类型,采用默认分词
           }
         ]
       }
     }
    }
    }
    说明 其他配置的解释请参见 ElasticsearchWriter配置项
  2. 单击保存,保存任务。

步骤4. 配置调度资源

  1. 登录DataWorks控制台
  2. 调度资源列表页面,单击新增调度资源
  3. 在弹出的对话框中,输入资源名称,并选择归属项目
  4. 在添加成功的页面,单击管理服务器
  5. 管理服务器对话框中,单击增加服务器
  6. 配置服务器参数,参数说明如下:
    参数 描述
    网络类型 选择专有网络
    ECS UUID 已购买的ECS的UUID。

    UUID获取方式:登录ECS,以root身份执行命令bash dmidecode | grep UUID

    机器IP 已购买的ECS的内网IP。
  7. 调度资源添加完成后,单击服务器初始化,并且按照提示步骤进行初始化。
  8. 单击服务器管理,并在弹出的对话框中单击刷新

    服务状态显示为正常,表示调度资源配置成功。

步骤5. 提交任务

全量导出到Elasticsearch的场景,一般有两个。

  • 场景一:Elasticsearch中数据结构发生变化或者扩容的时候,重建索引。

    这个场景一般是人工触发的,此时不需要配置调度任务,每次需要执行的时候点击任务配置页面上方的运行即可。

  • 场景二:定时将全量数据导入Elasticsearch。

    这个场景需要配置调度任务,一般配置为每天一次或者每周一次。配置步骤如下所示:

    1. 登录数据集成控制台
    2. 同步任务页面,双击刚刚创建的同步任务。
    3. 单击任务上方的提交按钮。
    4. 在弹出的对话框中,配置调度参数。参数说明如下:
      参数 描述
      调度类型 选择周期调度
      自动重跑 勾选。表示当失败的时候会默认重试3次,每次间隔2分钟。
      生效日期 推荐使用默认值,默认从1970-01-01到一百年后。
      调度周期 选择
      起始时间 选择00:00,表示每天0点开始执行全量导出任务。
      依赖属性 如果有依赖则填写,没有则不用填。
      跨周期依赖 选择自依赖,等待上一调度周期结束,才能继续运行。
    5. 单击确认

      周期性的同步任务配置完成,当前配置文件显示为只读状态。

步骤6. 绑定调度资源

  1. 单击页面上方的运维中心
  2. 在左侧导航栏,选择任务列表 > 周期任务 ,可以查看新创建的同步任务。
  3. 在该任务的右侧选择修改资源组

  4. 在弹出的对话框中,选择资源组,即刚刚创建的调度资源。

  5. 单击确认,绑定成功。

至此,Table Store数据可以在延迟5~10分钟的基础上自动同步到Elasticsearch中了。

步骤7. 验证结果

周期任务从第二天的00:00点开始执行。执行完一个任务后,在ECS上通过如下命令查看Elasticsearch中的数据量:

curl -XGET http://endpoint/index_name/type_name/_count?pretty  -d '
{
    "query": {
        "match_all": {}
    }
}'

返回类似如下结果:

{
  "count" : 1000,  # ElasticSearch中index_name索引的type_name类型中的doc数
  "_shards" : {     # 这个是ElasticSearch返回数据相关的meta值,表示总共有5个shard,全部成功返回了结果
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  }
}