文档

将表格存储数据表中数据同步到另一个数据表

更新时间:

使用通道服务、DataWorks或者DataX将表格存储数据表中的数据同步到另一个数据表。

前提条件

已创建目标数据表,目标数据表的列必须与源数据表中待迁移的列一一对应。具体操作,请参见创建数据表

说明

如果要实现跨账号、跨地域数据迁移,请使用DataX工具通过互联网或者通过云企业网连通VPC进行操作。关于使用云企业网的具体操作,请参见云企业网快速入门

使用通道服务迁移同步

创建源数据表的通道后,使用SDK进行迁移同步。迁移过程中可以自定义业务处理逻辑对数据进行处理。

前提条件

  • 已确定要使用的Endpoint。具体操作,请参见初始化OTSClient

  • 已配置密钥。具体操作,请参见初始化OTSClient

  • 已将密钥配置到环境变量中。具体操作,请参见初始化OTSClient

    表格存储使用OTS_AK_ENV环境变量名表示阿里云账号或者RAM用户的AccessKey ID,使用OTS_SK_ENV环境变量名表示对应AccessKey Secret,请根据实际配置。

操作步骤

  1. 使用表格存储控制台或SDK创建源数据表的通道并记录通道ID,具体操作请分别参见快速入门通过SDK使用通道服务

  2. 使用SDK迁移数据。

    示例代码如下:

    public class TunnelTest {
    
        public static void main(String[] args){
           String accessKeyId = System.getenv("OTS_AK_ENV");
           String accessKeySecret = System.getenv("OTS_SK_ENV");
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   accessKeyId,accessKeySecret,"instanceName");
    
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            //tunnelId可以在表格存储控制台通道管理页面查看或者调用describeTunnelRequest查询。
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor{
        
           //目标tablestore连接对象。
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   "accessKeyId","accessKeySecret","instanceName");
                   
           @Override
            public void process(ProcessRecordsInput processRecordsInput) {
            
                //ProcessRecordsInput中返回了增量或全量数据。
                List<StreamRecord> list = processRecordsInput.getRecords();
                for(StreamRecord streamRecord : list){
                    switch (streamRecord.getRecordType()){
                        case PUT:
                            //自定义业务处理逻辑。
                            //putRow
                            break;
                        case UPDATE:
                            //updateRow
                            break;
                        case DELETE:
                            //deleteRow
                            break;
                    }
    
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
                
            }
        }
    }

使用DataWorks或者DataX迁移同步

通过DataWorks或者DataX实现表格存储数据的迁移同步,此处以DataWorks为例介绍迁移操作。

步骤一:新增表格存储数据源

分别以源数据表和目标数据表所在实例新增表格存储数据源。

  1. 进入数据集成页面。

    1. 以项目管理员身份登录DataWorks控制台

    2. 在左侧导航栏,单击工作空间列表后,选择地域。

    3. 工作空间列表页面,在目标工作空间操作列选择快速进入>数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源页面,单击新增数据源

  4. 新增数据源对话框,找到Tablestore区块,单击Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    Endpoint

    Tablestore实例的服务地址。更多信息,请参见服务地址

    如果Tablestore实例和目标数据源的资源在同一个地域,填写VPC地址;如果Tablestore实例和目标数据源的资源不在同一个地域,填写公网地址。

    Table Store实例名称

    Tablestore实例的名称。更多信息,请参见实例

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。获取方式请参见创建AccessKey

    AccessKey Secret

  6. 测试资源组连通性。

    创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    重要
    • 数据同步时,一个任务只能使用一种资源组。资源组列表默认仅显示独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。

    • 如果未创建资源组,请单击新建独享数据集成资源组进行创建。具体操作,请参见新增和使用独享数据集成资源组

    单击相应资源组操作列的测试连通性,当连通状态为可连通时,表示连通成功。

  7. 测试连通性通过后,单击完成

    在数据源列表中,可以查看新建的数据源。

步骤三:新建同步任务节点

  1. 进入数据开发页面。

    1. 以项目管理员身份登录DataWorks控制台

    2. 选择地域,在左侧导航栏,单击工作空间列表

    3. 工作空间列表页面,在目标工作空间操作列选择快速进入>数据开发

  2. 在DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤四:配置离线同步任务并启动

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络链接。

    1. 选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

      重要

      数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    2. 网络与资源配置步骤,选择数据来源Tablestore,并选择数据源名称为新增的源数据源。

    3. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    4. 选择数据去向Tablestore,并选择数据源名称为新增的目标数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    5. 测试可连通后,单击下一步

    6. 提示对话框,单击确认使用脚本模式

      重要
      • 表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。

      • 任务转为脚本模式后,将无法转为向导模式。

  3. 配置任务并保存。

    全量数据的同步需要使用到Tablestore Reader与Tablestore Writer插件。脚本配置规则请参见Tablestore数据源

    1. 配置任务步骤,编辑脚本。

      • 配置Tablestore Reader

        Tablestore Reader插件实现了从Tablestore读取数据,通过您指定的抽取数据范围,可以方便地实现数据增量抽取的需求。具体操作,请参见附录:Reader脚本Demo与参数说明

      • 配置Tablestore Writer

        Tablestore Writer通过Tablestore官方Java SDK连接到Tablestore服务端,并通过SDK写入Tablestore服务端 。Tablestore Writer本身对于写入过程进行了诸多优化,包括写入超时重试、异常写入重试、批量提交等功能。具体操作,请参见附录:Writer脚本Demo与参数说明

    2. 按【Ctrl+S】保存脚本。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  4. 执行同步任务。

    说明

    全量数据一般只需要同步一次,无需配置调度属性。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称。

    3. 单击运行

      运行结束后,在同步任务的运行日志页签,单击Detail log url对应的链接后。在任务的详细运行日志页面,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。