全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
表格存储

增量同步(脚本模式)

更新时间:2017-12-04 19:19:25

数据集成(Data Integration)产品提供数据同步服务,有脚本模式向导模式两种方式。脚本模式更灵活,向导模式更简单。

本节介绍如何将Table Store中的增量数据通过数据集成的脚本模式同步到Elasticsearch中。

限制

概述中的限制以外,增量同步还有如下限制:

  • Table Store数据变化类型

    • 仅支持PUT(新增),UPDATE(更新)两种操作。
    • 不支持DELETE操作。
  • 增量同步任务延时

    • Table Store Stream是实时增量通道,但是调度采用数据集成,最小调度时间是5分钟。并且,Table Store Stream Reader的DataX插件限制了最快只能处理5分钟前的数据,所以目前同步任务会有5~10分钟的延迟。

步骤1. 创建Table Store数据源

说明:

  • 如果已经创建了Table Store的数据源,可以跳过此步骤。
  • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建同步任务

  1. 登录数据集成控制台

  2. 同步任务页面,选择脚本模式

    脚本模式

  3. 在弹出的导入模板对话框中,来源类型选择OTS Stream目标类型选择Elasticsearch

  4. 单击确认,进入配置页面。

步骤3. 完善配置项

  1. 在配置界面,已经提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,请参考以下注释完成配置。

    1. {
    2. "type": "job",
    3. "version": "1.0",
    4. "configuration": {
    5. "setting": {
    6. "errorLimit": {
    7. "record": "0" # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
    8. },
    9. "speed": {
    10. "mbps": "1", # 每次同步任务的最大流量。
    11. "concurrent": "1" # 每次同步任务的并发度。
    12. }
    13. },
    14. "reader": {
    15. "plugin": "otsstream", # Reader插件的名称。
    16. "parameter": {
    17. "endpoint": "", # TableStore中实例的endpoint
    18. "accessId": "", # 阿里云的AccessKeyID
    19. "accessKey": "", # 阿里云的AccessKeySecret
    20. "instanceName": "", # TableStore的实例名,如果使用DataSource,则需要新增配置项datasource,不再需要配置endpointaccessIdaccessKeyinstanceName
    21. "dataTable": "", # TableStore中的表名。
    22. "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。
    23. "startTimestampMillis": "", # 开始导出的时间点。
    24. "endTimestampMillis": "", # 结束导出的时间点。
    25. "date": "yyyyMMdd", # 导出哪一天的数据,功能和startTimestampMillisendTimestampMillis重复,这一项需要删除。
    26. "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前ElasticSearch只能接收这种格式的,这个不需要修改。如果配置模板中没有则需要增加。
    27. "column":[ # 需要导出TableStore中的哪些列到ElasticSearch中去,如果配置模板中没有则需要增加。
    28. {"name":"uid"},
    29. {"name":"name"},
    30. {"name":"phone"}
    31. ],
    32. "isExportSequenceInfo": false, # # single_version_and_update_only 模式下只能是false
    33. "maxRetries": 30 # 最大重试次数。
    34. }
    35. },
    36. "writer": {
    37. "plugin": "elasticsearch", # Writer插件的名称:ElasticSearchWriter,不需要修改。
    38. "parameter": {
    39. "endpoint": "",# ElasticSearchendpoint,控制台上有。
    40. "accessId": "",# 如果使用了X-PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写username
    41. "accessKey": "", # 如果使用了X-PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写password
    42. "index": "", # ElasticSearch的索引名称,如果之前没有,插件会自动创建。
    43. "indexType": "", # ElasticSearch中相应索引下的类型名称
    44. "cleanup": true, # 是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false
    45. "discovery": false, # 是否自动发现,设置为true
    46. "batchSize": 1000, # 每批导出的个数
    47. "splitter": ",", # 如果插入数据是array,就使用指定分隔符。
    48. "column": [ # ElasticSearch中的列名,顺序和Reader中的Column顺序一致
    49. {
    50. "name": "uid", # TableStore中的主键列是uid,这里也有同名uid,用typeid表示这一列是主键
    51. "type": "id" # id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
    52. },
    53. {
    54. "name": "name", # 对应于TableStore中的属性列:name
    55. "type": "text" # 文本类型,采用默认分词
    56. }
    57. ]
    58. }
    59. }
    60. }
    61. }

    说明:其他配置的解释请参见ElasticsearchWriter配置项

  2. 单击保存,保存任务。

步骤4. 运行任务(测试)

  1. 单击运行,开始执行任务。

    运行

    如果在配置里面有变量,比如存在${date},则会出现变量设置界面,只能设置具体值,如下图所示。

    自定义变量参数

  2. 查看页面下方的日志。

    如果日志中没有报错,则说明执行成功。

步骤5. 配置调度资源

  1. 登录DataWorks控制台

  2. 调度资源列表页面,单击新增调度资源

    新增调度资源

  3. 在弹出的对话框中,输入资源名称,并选择归属项目

  4. 在添加成功的页面,单击管理服务器

  5. 管理服务器对话框中,单击增加服务器

    增加服务器

  6. 配置服务器参数,如下所示。

    配置参数

    参数说明如下:

    参数描述
    网络类型选择专有网络
    ECS UUID已购买的ECS的UUID。
    UUID获取方式:登录ECS,以root身份执行命令bash dmidecode | grep UUID
    机器IP已购买的ECS的内网IP。
  7. 调度资源添加完成后,单击服务器初始化,并且按照提示步骤进行初始化。

    初始化

  8. 单击服务器管理,并在弹出的对话框中单击刷新

    服务状态显示为正常,表示调度资源配置成功。

    状态正常

步骤6. 提交任务

  1. 登录数据集成控制台

  2. 同步任务页面,双击刚刚创建的同步任务。

    双击

  1. 单击任务上方的提交按钮。

  2. 在弹出的对话框中,配置调度参数。参数说明如下:

    参数描述
    调度类型选择周期调度
    自动重跑勾选。表示当失败的时候会默认重试3次,每次间隔2分钟。
    生效日期推荐使用默认值,默认从1970-01-01到一百年后。
    调度周期选择分钟
    起始时间选择00:00至23:59,表示全天24小时都需要调度。
    时间间隔选择最小周期5分钟
    start_time输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
    end_time输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。
    date输入${bdp.system.bizdate},表示调度日期。
    依赖属性如果有依赖则填写,没有则不用填。
    跨周期依赖选择自依赖,等待上一调度周期结束,才能继续运行
  3. 单击确认

    周期性的同步任务配置完成,当前配置文件显示为只读状态

步骤7. 绑定调度资源

  1. 单击页面上方的运维中心

  2. 在左侧导航栏,选择任务列表 > 周期任务,可以查看新创建的同步任务。

  3. 在该任务的右侧选择修改资源组

    修改资源组

  4. 在弹出的对话框中,选择资源组,即刚刚创建的调度资源。

    选择资源组

  5. 单击确认,绑定成功。

至此,TableStore数据可以在延迟5~10分钟的基础上自动同步到Elasticsearch中了。

步骤8. 验证结果

周期任务从第二天的00:00点开始执行。执行完一个任务后,在ECS上通过如下命令查看Elasticsearch中的数据量:

  1. curl -XGET http://endpoint/index_name/type_name/_count?pretty -d '
  2. {
  3. "query": {
  4. "match_all": {}
  5. }
  6. }'

返回类似如下结果:

  1. {
  2. "count" : 1000, # ElasticSearch中index_name索引的type_name类型中的doc数
  3. "_shards" : { # 这个是ElasticSearch返回数据相关的meta值,表示总共有5个shard,全部成功返回了结果
  4. "total" : 5,
  5. "successful" : 5,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }
本文导读目录