全部产品
阿里云办公

增量同步(脚本模式)

更新时间:2017-12-04 19:13:52

数据集成(Data Integration)产品提供数据同步服务,有向导模式脚本模式两种方式。向导模式更简单,脚本模式更灵活。

本章介绍如何将Table Store中的增量数据通过数据集成的脚本模式同步到OpenSearch中。

途径

  • 数据集成脚本模式

    • Reader:OTSStreamReader
    • Writer:OSSWriter

配置Table Store

无需配置。

配置OSS

无需配置。

配置数据集成

  1. 创建Table Store数据源。

    说明:

    • 如果已经创建了Table Store的数据源,可以跳过这一步。
    • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

    创建数据源的具体步骤,请参见创建Table Store数据源

  2. 创建OSS数据源。

    本操作与上一个步骤类似,只是选择OSS作为数据源。

    说明:配置OSS数据源的参数时,注意Endpoint不包括bucketName。详细信息参见OSS endpoint介绍

  3. 创建同步任务。

    1. 登录数据集成控制台

    2. 同步任务页面,选择脚本模式

      脚本模式

    3. 在弹出的导入模板对话框中,来源类型选择OTS Stream目标类型选择OSS

    4. 单击确认,进入配置页面。

  4. 完善配置项。

    1. 在配置界面,已经提前嵌入了OTSStreamReader和OSSWriter的模板,请参考以下解释完成配置。

      1. {
      2. "type": "job",
      3. "version": "1.0",
      4. "configuration": {
      5. "setting": {
      6. "errorLimit": {
      7. "record": "0" # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
      8. },
      9. "speed": {
      10. "mbps": "1", # 每次同步任务的最大流量。
      11. "concurrent": "1" # 每次同步任务的并发度。
      12. }
      13. },
      14. "reader": {
      15. "plugin": "otsstream", # Reader插件的名称。
      16. "parameter": {
      17. "datasource": "", # Table Store的数据源名称,如果有此项则不再需要配置endpointaccessIdaccessKeyinstanceName
      18. "dataTable": "", # TableStore中的表名。
      19. "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。
      20. "startTimestampMillis": "", # 开始导出的时间点,由于是增量导出,需要循环启动此任务,则这里每次启动的时候的时间都不一样,这里需要设置一个变量,比如${start_time}。
      21. "endTimestampMillis": "", # 结束导出的时间点。这里也需要设置一个变量,比如${end_time}。
      22. "date": "yyyyMMdd", # 导出哪一天的数据,功能和startTimestampMillisendTimestampMillis重复,这一项需要删除。
      23. "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前需要设置成:single_version_and_update_only。如果配置模板中没有则需要增加。
      24. "column":[ # 需要导出TableStore中的哪些列到OSS中去,如果配置模板中没有则需要增加,具体配置个数由用户自定义设置
      25. {
      26. "name": "uid" # 列名,这个是Table Store中的主键
      27. },
      28. {
      29. "name": "name" # 列名,这个是Table Store中的属性列。
      30. },
      31. ],
      32. "isExportSequenceInfo": false, # single_version_and_update_only 模式下只能是false
      33. "maxRetries": 30 # 最大重试次数。
      34. }
      35. },
      36. "writer": {
      37. "plugin": "oss", # Writer插件的名称
      38. "parameter": {
      39. "datasource": "", # OSS的数据源名称
      40. "object": "", # 最后备份到OSS的文件名的前缀,建议Table Store实例名/表名/date。比如"instance/table/{date}"
      41. "writeMode": "truncate", # 支持truncate|append|nonConflicttruncate会清理已存在的同名文件;append会加到已存在的同名文件内容后面;nonConflict会报错当同名文件存在时。
      42. "fileFormat": "csv", # 文件类型
      43. "encoding": "UTF-8", # 编码类型
      44. "nullFormat": "null", # 当遇到控制时,在文本中如何表示
      45. "dateFormat": "yyyy-MM-dd HH:mm:ss", # # 时间格式
      46. "fieldDelimiter": "," # 每一列的分隔符
      47. }
      48. }
      49. }
      50. }

      说明:详细的配置项解释请参见配置OTSStreamReader配置OSSWriter

    2. 单击保存,保存任务。

  5. 运行任务。

    1. 单击页面上方的运行

    2. 在弹出的配置框中,配置变量参数。

    3. 单击确认后开始运行任务。

    4. 运行结束后登录OSS控制台检查是否成功备份文件。

  6. 配置调度。

    1. 单击提交

      提交

    2. 在弹出的对话框中,配置各项参数。

      调度参数

      参数说明如下:

      参数描述
      调度类型选择周期调度
      自动重跑如果勾选,则当失败的时候会默认重试3次,每次间隔2分钟。
      生效日期使用默认值。默认从1970-01-01到一百年后。
      调度周期选择分钟
      起始时间选择00:00至23:59,表示全天24小时都需要调度。
      时间间隔选择5分钟
      start_time输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
      end_time输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。
      date输入${bdp.system.bizdate},表示调度日期。
      依赖属性如果有依赖则填写,没有则不用填。
      跨周期依赖选择自依赖,等待上一调度周期结束,才能继续运行
    3. 单击确认

      周期性的同步任务配置完成,当前配置文件显示为只读状态

  7. 查看任务。

    1. 单击页面上方的运维中心

      运维中心

    2. 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。

    3. 新建的任务会从第二天00点开始执行。

      • 在左侧导航栏中,选择任务运维 > 周期实例,查看每一个预创建的当天同步任务,每个任务相隔5分钟,每个任务处理过去10~5分钟的数据。

      • 单击实例名称,可以查看详情。

    4. 单个任务在运行中或运行结束后,可以查看日志。

  8. 检查导出到OSS中的数据。

    登录OSS控制台,查看是否生成了新的文件,文件内容是否正确。

至此,Table Store数据可以在延迟5~10分钟的基础上自动同步到OSS中了。

本文导读目录