数据集成(Data Integration)产品提供数据同步服务,有向导模式和脚本模式两种方式。向导模式更简单,脚本模式更灵活。

本章介绍如何将Table Store中的增量数据通过数据集成的脚本模式同步到OpenSearch中。

途径

数据集成脚本模式

  • Reader:OTSStreamReader
  • Writer:OSSWriter

配置Table Store

无需配置。

配置OSS

无需配置。

配置数据集成

  1. 创建Table Store数据源。
    说明
    • 如果已经创建了Table Store的数据源,可以跳过这一步。
    • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

    创建数据源的具体步骤,请参见创建Table Store数据源

  2. 创建OSS数据源。
    本操作与上一个步骤类似,只是选择OSS作为数据源。
    说明 配置OSS数据源的参数时,注意Endpoint不包括bucketName。
  3. 创建同步任务。
    1. 登录数据集成控制台
    2. 同步任务页面,选择脚本模式
    3. 在弹出的导入模板对话框中,来源类型选择OTS Stream,目标类型选择OSS。
    4. 单击确认,进入配置页面。
  4. 完善配置项。
    1. 在配置界面,已经提前嵌入了OTSStreamReader和OSSWriter的模板,请参考以下解释完成配置。
      {
      "type": "job",
      "version": "1.0",
      "configuration": {
      "setting": {
      "errorLimit": {
       "record": "0"  # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
      },
      "speed": {
       "mbps": "1",  # 每次同步任务的最大流量。
       "concurrent": "1"   # 每次同步任务的并发度。
      }
      },
      "reader": {
      "plugin": "otsstream",  # Reader插件的名称。
      "parameter": {
       "datasource": "", # Table Store的数据源名称,如果有此项则不再需要配置endpoint,accessId,accessKey和instanceName。
       "dataTable": "", # TableStore中的表名。
       "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。
       "startTimestampMillis": "",  # 开始导出的时间点,由于是增量导出,需要循环启动此任务,则这里每次启动的时候的时间都不一样,这里需要设置一个变量,比如${start_time}。
       "endTimestampMillis": "",  # 结束导出的时间点。这里也需要设置一个变量,比如${end_time}。
       "date": "yyyyMMdd",  # 导出哪一天的数据,功能和startTimestampMillis、endTimestampMillis重复,这一项需要删除。
       "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前需要设置成:single_version_and_update_only。如果配置模板中没有则需要增加。
       "column":[  # 需要导出TableStore中的哪些列到OSS中去,如果配置模板中没有则需要增加,具体配置个数由用户自定义设置
                {
                   "name": "uid"  # 列名,这个是Table Store中的主键
                },
                {
                   "name": "name"  # 列名,这个是Table Store中的属性列。
                 },
       ],
       "isExportSequenceInfo": false, # single_version_and_update_only 模式下只能是false。
       "maxRetries": 30 # 最大重试次数。
      }
      },
      "writer": {
      "plugin": "oss", # Writer插件的名称
      "parameter": {
       "datasource": "", # OSS的数据源名称
       "object": "",  # 最后备份到OSS的文件名的前缀,建议Table Store实例名/表名/date。比如"instance/table/{date}"
       "writeMode": "truncate", # 支持truncate|append|nonConflict,truncate会清理已存在的同名文件;append会加到已存在的同名文件内容后面;nonConflict会报错当同名文件存在时。
       "fileFormat": "csv", # 文件类型
       "encoding": "UTF-8", # 编码类型
       "nullFormat": "null", # 当遇到控制时,在文本中如何表示
       "dateFormat": "yyyy-MM-dd HH:mm:ss", # # 时间格式
       "fieldDelimiter": "," # 每一列的分隔符
      }
      }
      }
      }
      说明 详细的配置项解释请参见配置OTSStreamReader配置OSSWriter
    2. 单击保存,保存任务。
  5. 运行任务。
    1. 单击页面上方的运行
    2. 在弹出的配置框中,配置变量参数。
    3. 单击确认后开始运行任务。
    4. 运行结束后登录OSS控制台检查是否成功备份文件。
  6. 配置调度。
    1. 单击提交
    2. 在弹出的对话框中,配置各项参数。

      参数说明如下:

      参数 描述
      调度类型 选择周期调度
      自动重跑 如果勾选,则当失败的时候会默认重试3次,每次间隔2分钟。
      生效日期 使用默认值。默认从1970-01-01到一百年后。
      调度周期 选择分钟。
      起始时间 选择00:00至23:59,表示全天24小时都需要调度。
      时间间隔 选择5分钟。
      start_time 输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
      end_time 输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。
      date 输入${bdp.system.bizdate},表示调度日期。
      依赖属性 如果有依赖则填写,没有则不用填。
      跨周期依赖 选择自依赖,等待上一调度周期结束,才能继续运行。
    3. 单击确认

      周期性的同步任务配置完成,当前配置文件显示为只读状态。

  7. 查看任务。
    1. 单击页面上方的运维中心
    2. 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。
    3. 新建的任务会从第二天00点开始执行。
      • 在左侧导航栏中,选择任务运维 > 周期实例,查看每一个预创建的当天同步任务,每个任务相隔5分钟,每个任务处理过去10~5分钟的数据。
      • 单击实例名称,可以查看详情。

    4. 单个任务在运行中或运行结束后,可以查看日志。
  8. 检查导出到OSS中的数据。

    登录OSS控制台,查看是否生成了新的文件,文件内容是否正确。

至此,Table Store数据可以在延迟5~10分钟的基础上自动同步到OSS中了。