数据集成(Data Integration)产品提供数据同步服务,有脚本模式和向导模式两种方式。脚本模式更灵活,向导模式更简单。

本章介绍如何将Table Store中的全量数据通过数据集成的脚本模式导出到OpenSearch中。

途径

数据集成脚本模式
  • Reader:OTSReader
  • Writer:OpenSearchWriter

步骤1. 创建Table Store数据源

说明 如果已经创建了Table Store的数据源,可以跳过这一步。

创建数据源的具体步骤,请参见创建Table Store数据源

步骤2. 创建导出任务

  1. 登录数据集成控制台
  2. 同步任务页面,选择脚本模式
  3. 在弹出的导入模板对话框中,来源类型选择OTS,目标类型选择OpenSearch。
  4. 单击确认,进入配置页面。

步骤3. 完善配置项

  1. 在配置界面,已经提前嵌入了otsreader和opensearchwriter的模板,请参考以下解释完成配置。
    {
    "type": "job",    # 不能修改
    "version": "1.0",  # 不能修改
    "configuration": {   
     "setting": {
       "errorLimit": {
         "record": "0"  # 超过record个错误后,导入任务会失败。
       },
       "speed": {
         "mbps": "1",  # 导入速率,单位是MB。
         "concurrent": "1"  # 并发度。
       }
     },
     "reader": {
       "plugin": "ots",  # 不能修改
       "parameter": {
         "datasource": "",   # 数据集成中的数据源名称,需要提前设置好,这里有两种选择,一种是配置datasource数据源,一种写明文的AccessKeyID等鉴权信息,鼓励使用数据源。
         "table": "",    # Table Store中的表名
         "column": [   # 需要导出到OpenSearch中的列名
           {
             "name": "column1"    # Table Store中列名,此列需要导入到OpenSearch
           },
           {
             "name": "column2"   # Table Store中列名,此列需要导入到OpenSearch
           },
           {
             "name": "column3"   # Table Store中列名,此列需要导入到OpenSearch
           }
         ],
         "range": {
           "begin": [
             {
               "type": "INF_MIN"   # Table Store中第一列主键的起始位置。如果要导出全量,这里需要配置为INF_MIN,如何导出部分,则按需要配置。如果有几列主键列,这里begin中就需要几项配置,但是当导出目标为OpenSearch的时候仅支持单列主键,所以这里仅演示单列的配置
             }
           ],
           "end": [
             {
               "type": "INF_MAX"   # Table Store中第一列主键的结束位置。如果要导出全量,这里需要配置为INF_MAX,如何导出部分,则按需要配置。
             }
           ],
           "split": [  # 用来配置Table Store的表的分区信息,可以加速导出,下一个版本会自动处理。
           ]
         }
       }
     },
     "writer": {
       "plugin": "opensearch",  # 不能修改
       "parameter": {
         "endpoint": "",   # OpenSearch中的实例的endpoint。
         "accessId": "",   # 阿里云的AccessKeyID
         "accessKey": "",  # 阿里云的AccessKeySecret
         "host": "",   # 配置同OpenSearch的endpoint,类似于https://opensearch-cn-hangzhou.aliyuncs.com
         "indexName": "",  # OpenSearch中的实例名
         "table": "",  # OpenSearch中配置的表名
         "column": [  # OpenSearch中配置的列名
           "col1",   # 按顺序对应otsreader中的列名,这里的col1对应于Table Store中的column1
           "col2"   # 按顺序对应otsreader中的列名,这里的col2对应于Table Store中的column2
         ],
         "batchSize": "500",  # 一次写入的个数
         "writeMode": "add",   # 写入模式,支持add和update,全量导出的时候建议用add
         "skipDirtyRecord": "true", # 是否跳过脏数据,比如格式不对的记录。
         "ignoreWriteError": "false"  # 如果出错的时候,是否忽略
       }
     }
    }
    }
  2. 单击保存,保存任务。

步骤4. 运行任务

  1. 单击运行,开始执行任务。
  2. 查看页面下方的日志。

    如果日志中没有报错,则说明执行成功,您可以到目标OpenSearch中去检查数据了。

    说明 因为全量导出一般是一次性的,所以不需要配置自动调度参数。如果需要配置调度参数,请参考 增量同步中的调度参数配置。

步骤5. 检查导出到OpenSearch中的数据

  1. 登录OpenSearch控制台
  2. 选择导出数据的应用名称,单击管理进入管理页面。
  3. 基本信息页面,查看文档总数,判断是否和自己导入的数据量一致。

    OpenSearch中的文档数如果超过100万,则结果为预估值。只要OpenSearch中有结果,且数据集成导出任务没有报错,则导出任务成功执行完成。

配置示例

Table Store表的数据结构如下:

第一列主键 属性列 属性列 属性列 性别 属性列 属性列
uid name phone_number age sex content description

目标:将Table Store中的name和phone_number两个字段全量导出到OpenSearch便于搜索。

完整配置如下所示:

{
  "configuration": {
    "reader": {
      "plugin": "ots",
      "parameter": {
        "datasource": "source_name",  # 数据集成中的数据源
        "column": [
          {
            "name": "uid"  # Table Store中的主键列,导出到OpenSearch中后也必须是主键
          },
          {
            "name": "name"  # Table Store中的属性列,用于搜索字段
          },
          {
            "name": "phone_number"   # Table Store中的属性列,用于搜索字段
          }
        ],
        "range": {
          "begin": [
            {
              "type": "INF_MIN"  # 导出数据的开始范围
            }
          ],
          "end": [
            {
              "type": "INF_MAX"    # 导出数据的结束范围
            }
          ]
        },
        "table": "cdp"  # Table Store中需要导出的表名
      }
    },
    "writer": {
      "plugin": "opensearch",
      "parameter": {
        "accessId": "OyRxxxxxxxxxxaXi ",  # 阿里云AccessKeyID
        "endpoint": "https://opensearch-cn-hangzhou.aliyuncs.com",
        "accessKey": "Z3wVAaLxxxxxxxxxxxxxxxxAOdbJRZ ",  # 阿里云AccessKeySecret
        "indexName": "index_name",  # OpenSearch中的索引名称
        "table": "table_name", # OpenSearch中index_name中的table名称
        "host": "https://opensearch-cn-hangzhou.aliyuncs.com",
        "column": [   # OpenSearch中index_name中的table的列名,顺序和Reader中的Column顺序一致
          "uid",   # OpenSearch中的主键,对应于TableStore中uid
          "name",  # OpenSearch中的列名,对应于TableStore中的name
          "phone_number"  # OpenSearch中的列名,对应于TableStore中的phone_number
        ],
        "writeMode": "add",  # add模式
        "batchSize": "500",
        "skipDirtyRecord": "true", # 如果有文档错误跳过
        "version": "v3",
        "ignoreWriteError": "false",   
      }
    },
    "setting": {
      "errorLimit": {
        "record": "0"  # 允许出现的错误数,0表示不允许任何一行出错误
      },
      "speed": {
        "concurrent": "1",  # 并发度为1
        "mbps": "1"  # 每秒导出速率为1MB
      }
    }
  },
  "type": "job",
  "version": "1.0"
}