全部产品
阿里云办公

全量导出(脚本模式)

更新时间:2017-12-04 19:13:18

数据集成(Data Integration)产品提供数据同步服务,有脚本模式向导模式两种方式。脚本模式更灵活,向导模式更简单。

本节介绍如何将Table Store中的全量数据通过数据集成的脚本模式导出到Elasticsearch中。

步骤1. 创建Table Store数据源

说明:

  • 如果已经创建了Table Store的数据源,可以跳过此步骤。
  • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建导出任务

  1. 登录数据集成控制台

  2. 同步任务页面,选择脚本模式

    脚本模式

  3. 在弹出的导入模板对话框中,来源类型选择OTS目标类型选择Elasticsearch

  4. 单击确认,进入配置页面。

步骤3. 完善配置项

  1. 在配置界面,已经提前嵌入了OTSReader和ElasticsearchWriter的模板,请参考以下注释完成配置。

    1. {
    2. "type": "job",
    3. "version": "1.0",
    4. "configuration": {
    5. "setting": {
    6. "errorLimit": {
    7. "record": "0" # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
    8. },
    9. "speed": {
    10. "mbps": "1", # 每次同步任务的最大流量。
    11. "concurrent": "1" # 每次同步任务的并发度。
    12. }
    13. },
    14. "reader": {
    15. "plugin": "otsstream", # Reader插件的名称。
    16. "parameter": {
    17. "datasource": "", # TableStore数据源。
    18. "dataTable": "", # TableStore中的表名。
    19. "column": [ # 配置要导出到Elasticsearch中的列名。
    20. {
    21. "name": ""
    22. },
    23. {
    24. "name": ""
    25. },
    26. ],
    27. "range": { # 配置本次全量导出的数据范围。支持全部数据导出和部分数据导出。
    28. "split": [], # 性能优化项,可以暂时不考虑,下一版本会自动处理。
    29. "begin": [ # 待导出数据的起始位置,begin中配置项个数和表的主键列个数一致。
    30. {
    31. "type": "INF_MIN" # 如果是从最小的数据开始导出,则这里设置为INF_MIN
    32. }
    33. ],
    34. "end": [ # 待导出数据的结束位置,end中配置项的个数需要和表的主键列个数保持一致。
    35. {
    36. "type": "INF_MIN" # 如果是一直导出到最大数据结束,则这里设置为INF_MAX
    37. }
    38. ]
    39. }
    40. }
    41. },
    42. "writer": {
    43. "plugin": "elasticsearch", # Writer插件的名称:ElasticSearchWriter,不需要修改。
    44. "parameter": {
    45. "endpoint": "",# ElasticSearchendpoint,控制台上有。
    46. "accessId": "",# 如果使用了X-PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写username
    47. "accessKey": "", # 如果使用了X-PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写password
    48. "index": "", # ElasticSearch的索引名称,如果之前没有,插件会自动创建。
    49. "indexType": "", # ElasticSearch中相应索引下的类型名称
    50. "cleanup": true, # 是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false
    51. "discovery": false, # 是否自动发现,设置为true
    52. "batchSize": 1000, # 每批导出的个数
    53. "splitter": ",", # 如果插入数据是array,就使用指定分隔符。
    54. "column": [ # ElasticSearch中的列名,顺序和Reader中的Column顺序一致
    55. {
    56. "name": "uid", # TableStore中的主键列是uid,这里也有同名uid,用typeid表示这一列是主键
    57. "type": "id" # id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
    58. },
    59. {
    60. "name": "name", # 对应于TableStore中的属性列:name
    61. "type": "text" # 文本类型,采用默认分词
    62. }
    63. ]
    64. }
    65. }
    66. }
    67. }

    说明:其他配置的解释请参见ElasticsearchWriter配置项

  2. 单击保存,保存任务。

步骤4. 配置调度资源

  1. 登录DataWorks控制台

  2. 调度资源列表页面,单击新增调度资源

    新增调度资源

  3. 在弹出的对话框中,输入资源名称,并选择归属项目

  4. 在添加成功的页面,单击管理服务器

  5. 管理服务器对话框中,单击增加服务器

    增加服务器

  6. 配置服务器参数,如下所示。

    配置参数

    参数说明如下:

    参数描述
    网络类型选择专有网络
    ECS UUID已购买的ECS的UUID。
    UUID获取方式:登录ECS,以root身份执行命令bash dmidecode | grep UUID
    机器IP已购买的ECS的内网IP。
  7. 调度资源添加完成后,单击服务器初始化,并且按照提示步骤进行初始化。

    初始化

  8. 单击服务器管理,并在弹出的对话框中单击刷新

    服务状态显示为正常,表示调度资源配置成功。

    状态正常

步骤5. 提交任务

全量导出到Elasticsearch的场景,一般有两个。

  • 场景一:Elasticsearch中数据结构发生变化或者扩容的时候,重建索引。

    这个场景一般是人工触发的,此时不需要配置调度任务,每次需要执行的时候点击任务配置页面上方的运行即可。

  • 场景二:定时将全量数据导入Elasticsearch。

    这个场景需要配置调度任务,一般配置为每天一次或者每周一次。配置步骤如下所示:

  1. 登录数据集成控制台

  2. 同步任务页面,双击刚刚创建的同步任务。

  3. 单击任务上方的提交按钮。

  4. 在弹出的对话框中,配置调度参数。参数说明如下:

    参数描述
    调度类型选择周期调度
    自动重跑勾选。表示当失败的时候会默认重试3次,每次间隔2分钟。
    生效日期推荐使用默认值,默认从1970-01-01到一百年后。
    调度周期选择
    起始时间选择00:00,表示每天0点开始执行全量导出任务。
    依赖属性如果有依赖则填写,没有则不用填。
    跨周期依赖选择自依赖,等待上一调度周期结束,才能继续运行
  5. 单击确认

    周期性的同步任务配置完成,当前配置文件显示为只读状态

步骤6. 绑定调度资源

  1. 单击页面上方的运维中心

  2. 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。

  3. 在该任务的右侧选择修改资源组

    修改资源组

  4. 在弹出的对话框中,选择资源组,即刚刚创建的调度资源。

    选择资源组

  5. 单击确认,绑定成功。

至此,Table Store数据可以在延迟5~10分钟的基础上自动同步到Elasticsearch中了。

步骤7. 验证结果

周期任务从第二天的00:00点开始执行。执行完一个任务后,在ECS上通过如下命令查看Elasticsearch中的数据量:

  1. curl -XGET http://endpoint/index_name/type_name/_count?pretty -d '
  2. {
  3. "query": {
  4. "match_all": {}
  5. }
  6. }'

返回类似如下结果:

  1. {
  2. "count" : 1000, # ElasticSearch中index_name索引的type_name类型中的doc数
  3. "_shards" : { # 这个是ElasticSearch返回数据相关的meta值,表示总共有5个shard,全部成功返回了结果
  4. "total" : 5,
  5. "successful" : 5,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }
本文导读目录