全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
表格存储

增量同步(脚本模式)

更新时间:2017-11-21 10:50:48

数据集成(Data Integration)产品提供数据同步服务,有向导模式脚本模式两种方式。向导模式更简单,脚本模式更灵活。

本节介绍如何将Table Store中的增量数据通过数据集成的脚本模式同步到OpenSearch中。

途径

  • 数据集成脚本模式

    • Reader:OTSStreamReader
    • Writer:OpenSearchWriter

限制

同步任务延迟:

Table Store Stream是实时增量通道,但是调度采用的是数据集成,最小调度时间是5分钟。并且,Table Store Stream Reader的dataX插件限制了最快只能处理5分钟前的数据,所以目前同步任务会有5~10分钟的延迟。

步骤1. 创建Table Store数据源

说明:如果已经创建了Table Store的数据源,可以跳过这一步。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建同步任务

  1. 登录数据集成控制台

  2. 同步任务页面,选择脚本模式

    脚本模式

  3. 在弹出的导入模板对话框中,来源类型选择OTS Stream目标类型选择OpenSearch

  4. 单击确认,进入配置页面。

步骤3. 完善配置项

  1. 在配置界面,已经提前嵌入了otsstreamreader和opensearchwriter的模板,请参考以下解释完成配置。

    1. {
    2. "type": "job", # 不能修改
    3. "version": "1.0", # 不能修改
    4. "configuration": {
    5. "setting": {
    6. "errorLimit": {
    7. "record": "0" # 超过record个错误后,导入任务会失败。
    8. },
    9. "speed": {
    10. "mbps": "1", # 导入速率,单位是MB
    11. "concurrent": "1" # 并发度。
    12. }
    13. },
    14. "reader": {
    15. "plugin": "otsstream", # 不能修改
    16. "parameter": {
    17. "datasource": "", # 数据源名称
    18. "dataTable": "", # TableStore中的表名
    19. "statusTable": "TableStoreStreamReaderStatusTable", # 导入任务的状态表,不需要修改
    20. "startTimeString ": "${start_time}", # 导入的数据的起始时间,由于需要周期性的调度,每次调度的时间值不一样,这里使用变量:start_time
    21. "endTimeString ": "${end_time}"", 导入的数据的结束时间,由于需要周期性的调度,每次调度的时间值不一样,这里使用变量:end_time
    22. "mode": "single_version_and_update_only", # mode必须配置为single_version_and_update_only
    23. "column":[
    24. {"name":"column1"}, # Table Store中列名,此列需要实时导入到OpenSearch
    25. {"name":"column2"} # Table Store中列名,此列需要实时导入到OpenSearch
    26. ],
    27. "isExportSequenceInfo": false, # 是否导出时序信息,一般为false
    28. "maxRetries": 30 # 最大重试次数,建议设置的更大一些
    29. }
    30. },
    31. "writer": {
    32. "plugin": "opensearch", # 不能修改
    33. "parameter": {
    34. "endpoint": "", # OpenSearch中的实例的endpoint,类似于https://opensearch-cn-hangzhou.aliyuncs.com
    35. "accessId": "", # 阿里云的AccessKeyID
    36. "accessKey": "", # 阿里云的AccessKeySecret
    37. "host": "", # 配置同OpenSearchendpoint,类似于https://opensearch-cn-hangzhou.aliyuncs.com
    38. "indexName": "", # OpenSearch中的实例名
    39. "table": "", # OpenSearch中配置的表名
    40. "column": [ # OpenSearch中配置的列名
    41. "col1", # 按顺序对应otsreader中的列名,这里的col1对应于Table Store中的column1
    42. "col2" # 按顺序对应otsreader中的列名,这里的col2对应于Table Store中的column2
    43. ],
    44. "batchSize": "500", # 一次写入的个数
    45. "writeMode": "update", # 写入模式,支持addupdate
    46. "skipDirtyRecord": "true", # 是否跳过脏数据,比如格式不对的记录。
    47. "ignoreWriteError": "false" # 如果出错的时候,是否忽略
    48. }
    49. }
    50. }
    51. }
  2. 单击保存,保存任务。

保存成功后可以执行如下操作:

  • 直接运行。
  • 配置调度。本示例中,需要配置一个周期性的同步任务,所以下一步选择配置调度。

    可以单击运行测试配置项,但是测试的时候不能在配置里面配置调度变量,必须指定确切的值。

步骤4. 配置调度

  1. 单击提交

    提交

  2. 在弹出的对话框中,配置各项参数。参数说明如下:

    参数描述
    调度类型选择周期调度
    自动重跑如果勾选,则当失败的时候会默认重试3次,每次间隔2分钟。
    生效日期默认是1970-01-01到一百年后,可以自定义配置。
    调度周期选择分钟
    起始时间选择00:00至23:59,表示全天24小时都需要调度。
    时间间隔选择最小周期5分钟
    依赖属性如果有依赖则填写,没有则不用填。
    跨周期依赖选择自依赖,等待上一调度周期结束,才能继续运行
  3. 单击确认

    周期性的同步任务配置完成,当前配置文件显示为只读状态

步骤5. 查看任务

  1. 单击页面上方的运维中心

    运维中心

  2. 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。

    周期任务

  3. 新建的任务会从第二天00点开始执行。

    • 在左侧导航栏中,选择任务运维 > 周期实例,查看每一个预创建的当天同步任务,每个任务相隔5分钟,每个任务处理过去10~5分钟的数据。

    • 单击实例名称,可以查看详情。

  4. 单个任务在运行中或运行结束后,可以查看日志。

至此,TableStore数据可以在延迟5~10分钟的基础上自动同步到OpenSearch中了。

步骤6. 检查导出到OpenSearch中的数据

  1. 登录OpenSearch控制台

  2. 选择导出数据的应用名称,单击管理进入管理页面。

  3. 基本信息页面,查看文档总数,判断是否和自己导入的数据量一致。

    OpenSearch中的文档数如果超过100万,则结果为预估值。只要OpenSearch中有结果,且数据集成导出任务没有报错,则导出任务成功执行完成。

配置示例

  • 目标:使用Table Store存储完整数据,使用OpenSearch对name和phone_number字段进行模糊查询。

  • Table Store信息

    • 实例名:school
    • 表名:user_info
    • 表的数据结构如下:
第一列主键 属性列 属性列 属性列
uid name phone_number content
  • OpenSearch信息

    • 索引名:user_index
    • user_index使用的数据源表:user_table
    • 字段:uid,name和phone_number。

详细的配置示例如下所示:

  1. {
  2. "type": "job",
  3. "version": "1.0",
  4. "configuration": {
  5. "setting": {
  6. "errorLimit": {
  7. "record": "0"
  8. },
  9. "speed": {
  10. "mbps": "1",
  11. "concurrent": "1"
  12. }
  13. },
  14. "reader": {
  15. "plugin": "otsstream",
  16. "parameter": {
  17. "datasource": "", # 数据源名称
  18. "dataTable": "user_info", # Table Store中实例school中的表名
  19. "statusTable": "TableStoreStreamReaderStatusTable",
  20. "startTimeString ": "${start_time}", # 开始时间变量
  21. "endTimeString ": "${end_time}", # 结束时间变量
  22. "mode": "single_version_and_update_only", # mode必须配置为single_version_and_update_only
  23. "column":[
  24. {"name":"uid"}, # Table Store中列名,此列需要实时导入到OpenSearch
  25. {"name":"name"}, # Table Store中列名,此列需要实时导入到OpenSearch
  26. {"name":"phone_number"} # TableStore中列名,此列需要实时导入到OpenSearch
  27. ],
  28. "isExportSequenceInfo": false,
  29. "maxRetries": 30
  30. }
  31. },
  32. "writer": {
  33. "plugin": "opensearch",
  34. "parameter": {
  35. "endpoint": "https://opensearch-cn-hangzhou.aliyuncs.com",
  36. "accessId": "OyRxxxxxxxxxxaXi",
  37. "accessKey": "Z3wVAaLxxxxxxxxxxxxxxxxAOdbJRZ",
  38. "host": "https://opensearch-cn-hangzhou.aliyuncs.com",
  39. "indexName": "user_index", # OpenSearch中的应用名
  40. "table": "user_table", # user_index应用的源表名
  41. "column": [
  42. "uid", # user_table表中的列名,这一列对应TableStore中的uid,必须在创建应用user_index的时候设置改列为主键
  43. "name",
  44. "phone_number"
  45. ],
  46. "batchSize": "500",
  47. "writeMode": "add",
  48. "skipDirtyRecord": "true",
  49. "ignoreWriteError": "false"
  50. }
  51. }
  52. }
  53. }
本文导读目录