数据集成(Data Integration)产品提供数据同步服务,有脚本模式和向导模式两种方式。脚本模式更灵活,向导模式更简单。

本章介绍如何将Table Store中的增量数据通过数据集成的脚本模式同步到OpenSearch中。

途径

数据集成脚本模式
  • Reader:OTSStreamReader
  • Writer:OpenSearchWriter

限制

同步任务延迟:

Table Store Stream是实时增量通道,但是调度采用的是数据集成,最小调度时间是5分钟。并且,Table Store Stream Reader的dataX插件限制了最快只能处理5分钟前的数据,所以目前同步任务会有5~10分钟的延迟。

步骤1. 创建Table Store数据源

说明 如果已经创建了Table Store的数据源,可以跳过这一步。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建同步任务

  1. 登录数据集成控制台
  2. 同步任务页面,选择脚本模式
  3. 在弹出的导入模板对话框中,来源类型选择OTS,目标类型选择OpenSearch。
  4. 单击确认,进入配置页面。

步骤3. 完善配置项

  1. 在配置界面,已经提前嵌入了otsstreamreader和opensearchwriter的模板,请参考以下解释完成配置。
    {
    "type": "job",  # 不能修改
    "version": "1.0",  # 不能修改
    "configuration": {
     "setting": {
       "errorLimit": {
         "record": "0"  # 超过record个错误后,导入任务会失败。
       },
       "speed": {
         "mbps": "1",  # 导入速率,单位是MB。
         "concurrent": "1"  # 并发度。
       }
     },
     "reader": {
       "plugin": "otsstream",  # 不能修改
       "parameter": {
         "datasource": "",  # 数据源名称
         "dataTable": "",  # TableStore中的表名
         "statusTable": "TableStoreStreamReaderStatusTable", # 导入任务的状态表,不需要修改
         "startTimeString ": "${start_time}",  # 导入的数据的起始时间,由于需要周期性的调度,每次调度的时间值不一样,这里使用变量:start_time
         "endTimeString ": "${end_time}"",  导入的数据的结束时间,由于需要周期性的调度,每次调度的时间值不一样,这里使用变量:end_time
         "mode": "single_version_and_update_only",  # mode必须配置为single_version_and_update_only
         "column":[
               {"name":"column1"},  # Table Store中列名,此列需要实时导入到OpenSearch
               {"name":"column2"}   # Table Store中列名,此列需要实时导入到OpenSearch
          ],
         "isExportSequenceInfo": false, # 是否导出时序信息,一般为false
         "maxRetries": 30  # 最大重试次数,建议设置的更大一些
       }
     },
     "writer": {
       "plugin": "opensearch",    # 不能修改
       "parameter": {
         "endpoint": "",    # OpenSearch中的实例的endpoint,类似于https://opensearch-cn-hangzhou.aliyuncs.com
         "accessId": "",     # 阿里云的AccessKeyID
         "accessKey": "",   # 阿里云的AccessKeySecret
         "host": "",            # 配置同OpenSearch的endpoint,类似于https://opensearch-cn-hangzhou.aliyuncs.com
         "indexName": "",  # OpenSearch中的实例名
         "table": "",           # OpenSearch中配置的表名
         "column": [          # OpenSearch中配置的列名
           "col1",               # 按顺序对应otsreader中的列名,这里的col1对应于Table Store中的column1
           "col2"                # 按顺序对应otsreader中的列名,这里的col2对应于Table Store中的column2
         ],
         "batchSize": "500",    # 一次写入的个数
         "writeMode": "update",   # 写入模式,支持add和update
         "skipDirtyRecord": "true",  # 是否跳过脏数据,比如格式不对的记录。
         "ignoreWriteError": "false"   # 如果出错的时候,是否忽略
       }
     }
    }
    }
  2. 单击保存,保存任务。

    保存成功后可以执行如下操作:

    • 直接运行。
    • 配置调度。本示例中,需要配置一个周期性的同步任务,所以下一步选择配置调度。

      可以单击运行测试配置项,但是测试的时候不能在配置里面配置调度变量,必须指定确切的值,比如“20180115120000”。

步骤4. 配置调度

  1. 单击提交
  2. 在弹出的对话框中,配置各项参数。参数说明如下:
    参数 描述
    调度类型 选择周期调度
    自动重跑 如果勾选,则当失败的时候会默认重试3次,每次间隔2分钟。
    生效日期 默认是1970-01-01到一百年后,可以自定义配置。
    调度周期 选择分钟。
    起始时间 选择00:00至23:59,表示全天24小时都需要调度。
    时间间隔 选择最小周期5分钟。
    startTime 输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
    endTime 输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。
    依赖属性 如果有依赖则填写,没有则不用填。
    跨周期依赖 选择自依赖,等待上一调度周期结束,才能继续运行。
  3. 单击确认

    周期性的同步任务配置完成,当前配置文件显示为只读状态。

步骤5. 查看任务

  1. 单击页面上方的运维中心
  2. 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。
  3. 新建的任务会从第二天00点开始执行。
    • 在左侧导航栏中,选择任务运维 > 周期实例 ,查看每一个预创建的当天同步任务,每个任务相隔5分钟,每个任务处理过去10~5分钟的数据。

    • 单击实例名称,可以查看详情。

  4. 单个任务在运行中或运行结束后,可以查看日志。

至此,TableStore数据可以在延迟5~10分钟的基础上自动同步到OpenSearch中了。

步骤6. 检查导出到OpenSearch中的数据

  1. 登录OpenSearch控制台
  2. 选择导出数据的应用名称,单击管理进入管理页面。
  3. 基本信息页面,查看文档总数,判断是否和自己导入的数据量一致。

    OpenSearch中的文档数如果超过100万,则结果为预估值。只要OpenSearch中有结果,且数据集成导出任务没有报错,则导出任务成功执行完成。

配置示例

  • 目标:使用Table Store存储完整数据,使用OpenSearch对name和phone_number字段进行模糊查询。

  • Table Store信息

    • 实例名:school
    • 表名:user_info
    • 表的数据结构如下:
      第一列主键 属性列 属性列 属性列
      uid name phone_number content
  • OpenSearch信息
    • 索引名:user_index
    • user_index使用的数据源表:user_table
    • 字段:uid,name和phone_number
    详细的配置示例如下所示: