全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
表格存储

全量导出(DataX)

更新时间:2018-03-16 10:24:12

本章介绍如何将Table Store中的全量数据通过开源工具DataX导出到Elasticsearch中。

DataX是阿里巴巴集团内广泛使用的离线数据同步工具/平台,实现包括MySQL、Oracle、HDFS、Hive、Table Store、MaxCompute等各种异构数据源之间高效的数据同步功能。

途径

DataX

  • Reader:OTSSReader
  • Writer:ElasticsearchWriter

配置Table Store

无须配置。

配置Elasticsearch

无须配置。

配置DataX

  1. 登录VPC环境内的ECS,确定一个工作目录。

  2. 准备配置文件。

    DataX主要配置项描述
    job的控制参数-
    Reader的参数Reader配置otsstreamreader,包括name和parameter参数。parameter参数中包含了Reader所需的自定义参数。
    Writer的参数Writer配置elasticsearchwriter,包括name和parameter参数。parameter参数中包含了Writer所需的自定义参数。

    配置文件的格式如下所示。下述配置表示:

    • 导出Table Store实例your-instance中表table_name的所有行的三列(主键列:uid,属性列:name,phone_number)数据到Elasticsearch中。

    • 同步uid数,同步范围:从INF_MIN到INF_MAX,即导出所有主键的数据。

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. "channel": 1 # 通道数目,默认是1,可以根据自己数据量大小填写
    6. },
    7. "errorLimit": {
    8. "percentage": 0 # 允许的错误比例,一般设置为0即可
    9. }
    10. },
    11. "content": [
    12. {
    13. "reader": {
    14. "name": "ots", # 不能修改
    15. "parameter": {
    16. "datasource": "", # 数据集成中的数据源名称,需要提前设置好,这里有两种选择,一种是配置datasource数据源,一种是写明文的AccessKeyID等鉴权信息。建议使用数据源。如果配置为写明文方式,则需要新增四个配置项("endpoint""accessId":"""accessKey":"""instanceName")替换掉datasource
    17. "table": "", # Table Store中的表名
    18. "column": [ # 需要导出到Elasticsearch中的列名
    19. {
    20. "name": "uid" # Table Store中列名,此列需要导入到Elasticsearch
    21. },
    22. {
    23. "name": "name" # Table Store中列名,此列需要导入到Elasticsearch
    24. },
    25. {
    26. "name": "phone_number" # Table Store中列名,此列需要导入到Elasticsearch
    27. }
    28. ],
    29. "range": {
    30. "begin": [
    31. {
    32. "type": "INF_MIN" # Table Store中第一列主键的起始位置。如果要导出全量,这里需要配置为INF_MIN,如何导出部分,则按需要配置。如果有几列主键列,这里begin中就需要几项配置,但是当导出目标为Elasticsearch的时候仅支持单列主键,所以这里仅演示单列的配置
    33. }
    34. ],
    35. "end": [
    36. {
    37. "type": "INF_MAX" # Table Store中第一列主键的结束位置。如果要导出全量,这里需要配置为INF_MAX,如何导出部分,则按需要配置。
    38. }
    39. ],
    40. "split": [ # 用来配置Table Store的表的分区信息,可以加速导出,下一个版本会自动处理。
    41. ]
    42. }
    43. }
    44. },
    45. "writer": {
    46. "name": "elasticsearchwriter", # Reader插件的名称:ElasticsearchWriter,不需要修改
    47. "parameter": {
    48. "accessId": "OyR5xxxxxxxxXaXi", # AccessKeyID,如果不需要则填空值就行,阿里云Elasticsearch需要,无安全验证的开源Elasticsearch则不需要
    49. "accessKey": "Z3wVxxxxxxxxxxxxxxxxxxxxxxxxRZ", # AccessKeySecret,如果不需要则填空值就行,阿里云Elasticsearch需要,无安全验证的开源Elasticsearch则不需要
    50. "endpoint": "https://127.0.0.1:9200", # Elasticsearchendpoint
    51. "index": "school_index", # Elasticsearch的索引名称,比如这里的索引名称是school_index
    52. "type": "user_info", # Elasticsearch中相应索引下的类型名称,这里是user_info
    53. "cleanup": false, # 是否在每次导入数据到Elasticsearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false
    54. "discovery":true, # 是否自动发信,设置为true
    55. "batchSize": 100, # 每批导出的格式
    56. "column": [ # Elasticsearch中的列名,顺序和Reader中的Column顺序一致
    57. {
    58. "name": "uid", # Table Store中的主键列是uid,这里也有同名uid,用typeid表示这一列是主键
    59. "type": "id" # id表示这一列是主键,id不是Elasticsearch的内置类型,是ElasticsearchWriter提供的虚拟类型
    60. },
    61. {
    62. "name": "name", # 对应于Table Store中的属性列:name
    63. "type": "text" # 文本类型,采用默认分词
    64. },
    65. {
    66. "name": "phone_number", # 对应于Table Store中的属性列:phone
    67. "type": "text" # 文本类型,采用默认分词
    68. }
    69. ]
    70. }
    71. }
    72. }
    73. ]
    74. }
    75. }
  3. 完成配置文件后,执行DataX命令。

    1. python /home/admin/datax3/bin/datax.py -j"-Xms4g -Xmx4g" /path/to/job_config.json
  4. 检查数据是否已经导入到Elasticsearch中。

    通过如下命令查看导入Elasticsearch中的文档数:

    1. # 把命令中的${index_name}替换成你自己的index_name,把${type_name}替换成你自己的type_name。
    2. curl -XGET https://endpoint/${index_name}/${type_name}/_count?pretty -d '
    3. {
    4. "query": {
    5. "match_all": {}
    6. }
    7. }'

    返回结果类似如下所示:

    1. {
    2. "count" : 1000, # Elasticsearchindex_name索引的type_name类型中的doc
    3. "_shards" : { # 这个是Elasticsearch返回数据相关的meta值,表示总共有5shard,全部成功返回了结果
    4. "total" : 5,
    5. "successful" : 5,
    6. "skipped" : 0,
    7. "failed" : 0
    8. }
    9. }

    如果以上数据准确,则可以配置定时导出任务。

  5. 配置定时导出任务。

    从Table Store导出全量数据到Elasticsearch中的场景,一般是一次性的,不需要配置定时任务。

    如果您需要每隔一段时间重建索引,则可以配置crontab,定时执行导出任务的命令。

本文导读目录