数据集成(Data Integration)产品提供数据同步服务,有脚本模式和向导模式两种方式。脚本模式更灵活,向导模式更简单。

本节介绍如何将Table Store中的全量数据通过数据集成的脚本模式同步到Elasticsearch中。

限制

概述中的限制以外,增量同步还有如下限制:

  • Table Store数据变化类型

    • 仅支持PUT(新增),UPDATE(更新)两种操作。
    • 不支持DELETE操作。
  • 增量同步任务延时

    • Table Store Stream是实时增量通道,但是调度采用数据集成,最小调度时间是5分钟。并且,Table Store Stream Reader的DataX插件限制了最快只能处理5分钟前的数据,所以目前同步任务会有5~10分钟的延迟。

步骤1. 创建Table Store数据源

说明
  • 如果已经创建了Table Store的数据源,可以跳过此步骤。
  • 如果您不希望创建数据源,也可以在后续的配置页面中配置相应的endpoint、instanceName、AccessKeyID和AccessKeySecret。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建同步任务

  1. 登录数据集成控制台
  2. 同步任务页面,选择脚本模式
  3. 在弹出的导入模板对话框中,来源类型选择OTS,目标类型选择Elasticsearch。
  4. 单击确认,进入配置页面。

完善配置项

  1. 在配置界面,已经提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,请参考以下注释完成配置。
    {
    "type": "job",
    "version": "1.0",
    "configuration": {
     "setting": {
       "errorLimit": {
         "record": "0"  # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
       },
       "speed": {
         "mbps": "1",  # 每次同步任务的最大流量。
         "concurrent": "1" # 每次同步任务的并发度。
       }
     },
     "reader": {
       "plugin": "otsstream",  # Reader插件的名称。
       "parameter": {
         "endpoint": "",  # TableStore中实例的endpoint。
         "accessId": "",  # 阿里云的AccessKeyID。
         "accessKey": "",  # 阿里云的AccessKeySecret。
         "instanceName": "",  # TableStore的实例名,如果使用DataSource,则需要新增配置项datasource,不再需要配置endpoint,accessId,accessKey和instanceName。
         "dataTable": "",  # TableStore中的表名。
         "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。
         "startTimestampMillis": "", # 开始导出的时间点。
         "endTimestampMillis": "",   # 结束导出的时间点。
         "date": "yyyyMMdd",  # 导出哪一天的数据,功能和startTimestampMillis、endTimestampMillis重复,这一项需要删除。
         "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前ElasticSearch只能接收这种格式的,这个不需要修改。如果配置模板中没有则需要增加。
         "column":[  # 需要导出TableStore中的哪些列到ElasticSearch中去,如果配置模板中没有则需要增加。
             {"name":"uid"},  
             {"name":"name"},           
             {"name":"phone"}
         ],
         "isExportSequenceInfo": false, #  # single_version_and_update_only 模式下只能是false。
         "maxRetries": 30 # 最大重试次数。
       }
     },
     "writer": {
       "plugin": "elasticsearch",  # Writer插件的名称:ElasticSearchWriter,不需要修改。
       "parameter": {
         "endpoint": "",# ElasticSearch的endpoint,控制台上有。
         "accessId": "",# 如果使用了X-PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写username。
         "accessKey": "", # 如果使用了X-PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写password。  
         "index": "",  # ElasticSearch的索引名称,如果之前没有,插件会自动创建。
         "indexType": "", # ElasticSearch中相应索引下的类型名称
         "cleanup": true, # 是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false。
         "discovery": false, # 是否自动发现,设置为true
         "batchSize": 1000, # 每批导出的个数
         "splitter": ",",  # 如果插入数据是array,就使用指定分隔符。
         "column": [  # ElasticSearch中的列名,顺序和Reader中的Column顺序一致
           {
             "name": "uid",  # TableStore中的主键列是uid,这里也有同名uid,用type:id表示这一列是主键
             "type": "id"  # id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
           },
           {
             "name": "name", # 对应于TableStore中的属性列:name
             "type": "text"  # 文本类型,采用默认分词
           }
         ]
       }
     }
    }
    }
    说明 其他配置的解释请参见 ElasticsearchWriter配置项
  2. 单击保存,保存任务。

步骤4. 运行任务(测试)

  1. 单击运行,开始执行任务。

    如果在配置里面有变量,比如存在${date},则会出现变量设置界面,只能设置具体值。

  2. 查看页面下方的日志。

    如果日志中没有报错,则说明执行成功。

步骤5. 配置调度资源

  1. 登录DataWorks控制台
  2. 调度资源列表页面,单击新增调度资源
  3. 在弹出的对话框中,输入资源名称,并选择归属项目
  4. 在添加成功的页面,单击管理服务器
  5. 管理服务器对话框中,单击增加服务器
  6. 配置服务器参数,参数说明如下:
    参数 描述
    网络类型 选择专有网络
    ECS UUID 已购买的ECS的UUID。

    UUID获取方式:登录ECS,以root身份执行命令bash dmidecode | grep UUID

    机器IP 已购买的ECS的内网IP。
  7. 调度资源添加完成后,单击服务器初始化,并且按照提示步骤进行初始化。
  8. 单击服务器管理,并在弹出的对话框中单击刷新

    服务状态显示为正常,表示调度资源配置成功。

步骤6. 提交任务

  1. 登录数据集成控制台
  2. 同步任务页面,双击刚刚创建的同步任务。

  3. 单击任务上方的提交按钮。
  4. 在弹出的对话框中,配置调度参数。参数说明如下:
    参数 描述
    调度类型 选择周期调度
    自动重跑 勾选。表示当失败的时候会默认重试3次,每次间隔2分钟。
    生效日期 推荐使用默认值,默认从1970-01-01到一百年后。
    调度周期 选择分钟。
    起始时间 选择00:00至23:59,表示全天24小时都需要调度。
    时间间隔 选择最小周期5分钟。
    start_time 输入$[yyyymmddhh24miss-10/24/60],表示调度时间减去10分钟。
    end_time 输入$[yyyymmddhh24miss-5/24/60],表示调度时间减去5分钟。
    date 输入${bdp.system.bizdate},表示调度日期。
    依赖属性 如果有依赖则填写,没有则不用填。
    跨周期依赖 选择自依赖,等待上一调度周期结束,才能继续运行。
  5. 单击确认

    周期性的同步任务配置完成,当前配置文件显示为只读状态。

步骤7. 绑定调度资源

  1. 单击页面上方的运维中心
  2. 在左侧导航栏,选择任务列表 > 周期任务 ,可以查看新创建的同步任务。
  3. 在该任务的右侧选择修改资源组

  4. 在弹出的对话框中,选择资源组,即刚刚创建的调度资源。

  5. 单击确认,绑定成功。

至此,Table Store数据可以在延迟5~10分钟的基础上自动同步到Elasticsearch中了。

步骤8. 验证结果

周期任务从第二天的00:00点开始执行。执行完一个任务后,在ECS上通过如下命令查看Elasticsearch中的数据量:

curl -XGET http://endpoint/index_name/type_name/_count?pretty  -d '
{
    "query": {
        "match_all": {}
    }
}'

返回类似如下结果:

{
  "count" : 1000,  # ElasticSearch中index_name索引的type_name类型中的doc数
  "_shards" : {     # 这个是ElasticSearch返回数据相关的meta值,表示总共有5个shard,全部成功返回了结果
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  }
}