全部产品
阿里云办公

增量同步(DataX)

更新时间:2017-12-04 19:19:02

本章介绍如何将Table Store中的增量数据通过开源工具DataX同步到Elasticsearch中。

DataX是阿里巴巴集团内广泛使用的离线数据同步工具/平台,实现包括MySQL、Oracle、HDFS、Hive、Table Store、MaxCompute等各种异构数据源之间高效的数据同步功能。

途径

DataX

  • Reader:OTSStreamReader
  • Writer:ElasticsearchWriter

限制

  • Table Store数据变化类型

    • 仅支持PUT(新增),UPDATE(更新)两种操作。
    • 不支持DELETE操作。
  • 增量同步任务延时

    • Table Store Stream是实时增量通道,但是调度采用数据集成,最小调度时间是5分钟。并且,Table Store Stream Reader的DataX插件限制了最快只能处理5分钟前的数据,所以目前同步任务会有5~10分钟的延迟。

配置Table Store

无须配置。

配置Elasticsearch

无须配置。

配置DataX

  1. 登录VPC环境内的ECS,确定一个工作目录。

  2. 准备配置文件。

    DataX主要配置项描述
    job的控制参数-
    Reader的参数Reader配置otsstreamreader,包括name和parameter参数。parameter参数中包含了Reader所需的自定义参数。
    Writer的参数Writer配置elasticsearchwriter,包括name和parameter参数。parameter参数中包含了Writer所需的自定义参数。

    配置文件的格式如下所示。下述配置表示:

    • 同步Table Store实例your-instance中表table_name的三列(主键列:uid,属性列:name,phone)的更新值(新增,更新,删除)到Elasticsearch中。

    • 同步时间戳:1510036262000(2017/11/07 14:31:21)到 1510036881000(2017/11/07 14:41:21)之间发生变化的数据。

      1. {
      2. "job": {
      3. "setting": {
      4. "speed": {
      5. "channel": 1 # 通道数目,默认是1,可以根据自己数据量大小填写
      6. },
      7. "errorLimit": {
      8. "percentage": 0 # 允许的错误比例,一般设置为0即可
      9. }
      10. },
      11. "content": [
      12. {
      13. "reader": {
      14. "name": "otsstreamreader", # 读取插件名称:OTSStreamReader
      15. "parameter": {
      16. "endpoint": "https://your-instance.cn-hangzhou.ots.aliyuncs.com", # Table Store的实例名称
      17. "accessId": "OyR5xxxxxxxxXaXi", # 阿里云AccessKeyID
      18. "accessKey": "Z3wVxxxxxxxxxxxxxxxxxxxxxxxxRZ", # 阿里云AccessKeySecret
      19. "instanceName": "your-instance.", # Table Store的实例名称
      20. "dataTable": "table_name", # Table Store中需要同步到Elasticsearch的表名
      21. "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改
      22. "startTimestampMillis": 1510036262000, # 开始导出的时间点
      23. "endTimestampMillis": 1510036881000, # 结束导出的时间点
      24. "mode": "single_version_and_update_only", # Table Store Stream导出数据的格式,目前Elasticsearch只能接收这种格式的,这个不需要修改
      25. "column":[ # 需要导出Table Store中的哪些列到Elasticsearch中去
      26. {"name":"uid"},
      27. {"name":"name"},
      28. {"name":"phone"}
      29. ],
      30. "isExportSequenceInfo": false, # single_version_and_update_only 模式下只能是false
      31. "maxRetries": 30 # 最大重试次数
      32. }
      33. },
      34. "writer": {
      35. "name": "elasticsearchwriter", # Reader插件的名称:ElasticsearchWriter,不需要修改
      36. "parameter": {
      37. "accessId": "OyR5xxxxxxxxXaXi", # AccessKeyID,如果不需要则填空值就行,阿里云Elasticsearch需要,无安全验证的开源Elasticsearch则不需要
      38. "accessKey": "Z3wVxxxxxxxxxxxxxxxxxxxxxxxxRZ", # AccessKeySecret,如果不需要则填空值就行,阿里云Elasticsearch需要,无安全验证的开源Elasticsearch则不需要
      39. "endpoint": "https://127.0.0.1:9200", # Elasticsearchendpoint
      40. "index": "school_index", # Elasticsearch的索引名称,比如这里的索引名称是school_index
      41. "type": "user_info", # Elasticsearch中相应索引下的类型名称,这里是user_info
      42. "cleanup": false, # 是否在每次导入数据到Elasticsearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false
      43. "discovery":true, # 是否自动发信,设置为true
      44. "batchSize": 100, # 每批导出的格式
      45. "column": [ # Elasticsearch中的列名,顺序和Reader中的Column顺序一致
      46. {
      47. "name": "uid", # Table Store中的主键列是uid,这里也有同名uid,用typeid表示这一列是主键
      48. "type": "id" # id表示这一列是主键,id不是Elasticsearch的内置类型,是ElasticsearchWriter提供的虚拟类型
      49. },
      50. {
      51. "name": "name", # 对应于Table Store中的属性列:name
      52. "type": "text" # 文本类型,采用默认分词
      53. },
      54. {
      55. "name": "phone", # 对应于Table Store中的属性列:phone
      56. "type": "text" # 文本类型,采用默认分词
      57. }
      58. ]
      59. }
      60. }
      61. }
      62. ]
      63. }
      64. }
  3. 完成配置文件后,执行DataX命令。

    1. python /home/admin/datax3/bin/datax.py -j"-Xms4g -Xmx4g" /path/to/job_config.json
  4. 检查数据是否已经导入到Elasticsearch中。

    通过如下命令查看导入Elasticsearch中的文档数:

    1. curl -XGET https://endpoint/index_name/type_name/_count?pretty -d '
    2. {
    3. "query": {
    4. "match_all": {}
    5. }
    6. }'

    返回结果类似如下所示:

    1. {
    2. "count" : 1000, # Elasticsearchindex_name索引的type_name类型中的doc
    3. "_shards" : { # 这个是Elasticsearch返回数据相关的meta值,表示总共有5shard,全部成功返回了结果
    4. "total" : 5,
    5. "successful" : 5,
    6. "skipped" : 0,
    7. "failed" : 0
    8. }
    9. }

    如果以上数据准确,则可以配置定时导出任务。

  5. 配置定时导出任务。

    因为使用了DataX,您需要自己通过脚本来控制调度。简单的控制流程如下所示:

    1. 构造一个配置文件模板,配置文件模板中的startTimestampMillis和endTimestampMillis设置为两个变量,例如,{start_time}和{end_time}。

    2. 实现一个脚本:run_datax.py

      1. 每隔5分钟执行一次。

      2. 执行完成后记录本次执行的起始时间last_time。

      3. 开始执行的时候获取上次执行的起始时间,然后设置start_time=last_time+ 5分钟,end_time= last_time+10分钟。

      4. 通过命令python /home/admin/datax3/bin/datax.py -j"-Xms4g -Xmx4g" -D start_time =xxxxx -D end_time =yyyyy /path/to/job_config启动一次DataX同步任务。

本文导读目录