将表格存储数据表中数据同步到另一个数据表

使用通道服务、DataWorks或者DataX将表格存储数据表中的数据同步到另一个数据表。

前提条件

已创建目标数据表,目标数据表的列必须与源数据表中待迁移的列一一对应。具体操作,请参见创建数据表

说明

如果要实现跨账号、跨地域数据迁移,请使用DataX工具通过互联网或者通过云企业网连通VPC进行操作。关于使用云企业网的具体操作,请参见云企业网快速入门

使用通道服务迁移同步

创建源数据表的通道后,使用SDK进行迁移同步。迁移过程中可以自定义业务处理逻辑对数据进行处理。

前提条件

  • 已确定要使用的Endpoint。具体操作,请参见初始化OTSClient

  • 已配置密钥。具体操作,请参见初始化OTSClient

  • 已将密钥配置到环境变量中。具体操作,请参见初始化OTSClient

    表格存储使用OTS_AK_ENV环境变量名表示阿里云账号或者RAM用户的AccessKey ID,使用OTS_SK_ENV环境变量名表示对应AccessKey Secret,请根据实际配置。

操作步骤

  1. 使用表格存储控制台或SDK创建源数据表的通道并记录通道ID,具体操作请分别参见快速入门通过SDK使用通道服务

  2. 使用SDK迁移数据。

    示例代码如下:

    public class TunnelTest {
    
        public static void main(String[] args){
           String accessKeyId = System.getenv("OTS_AK_ENV");
           String accessKeySecret = System.getenv("OTS_SK_ENV");
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   accessKeyId,accessKeySecret,"instanceName");
    
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            //tunnelId可以在表格存储控制台通道管理页面查看或者调用describeTunnelRequest查询。
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor{
        
           //目标tablestore连接对象。
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   "accessKeyId","accessKeySecret","instanceName");
                   
           @Override
            public void process(ProcessRecordsInput processRecordsInput) {
            
                //ProcessRecordsInput中返回了增量或全量数据。
                List<StreamRecord> list = processRecordsInput.getRecords();
                for(StreamRecord streamRecord : list){
                    switch (streamRecord.getRecordType()){
                        case PUT:
                            //自定义业务处理逻辑。
                            //putRow
                            break;
                        case UPDATE:
                            //updateRow
                            break;
                        case DELETE:
                            //deleteRow
                            break;
                    }
    
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
                
            }
        }
    }

使用DataWorks或者DataX迁移同步

通过DataWorks或者DataX实现表格存储数据的迁移同步,此处以DataWorks为例介绍迁移操作。

步骤一:新增表格存储数据源

分别以源数据表和目标数据表所在实例新增表格存储数据源。

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,找到Tablestore区块,单击Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    Endpoint

    Tablestore实例的服务地址。更多信息,请参见服务地址

    如果Tablestore实例和目标数据源的资源在同一个地域,填写VPC地址;如果Tablestore实例和目标数据源的资源不在同一个地域,填写公网地址。

    Table Store实例名称

    Tablestore实例的名称。更多信息,请参见实例

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

    AccessKey Secret

  6. 测试资源组连通性。

    创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    重要

    数据同步时,一个任务只能使用一种资源组。资源组列表默认显示仅数据集成公共资源组。为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。

    1. 单击前往购买进行全新创建并绑定资源组到工作空间或单击绑定已购资源组为工作空间绑定已有资源组。具体操作,请参见新增和使用独享数据集成资源组

    2. 待资源组启动成功后,单击相应资源组连通状态(生产环境)列的测试连通性

      当连通状态显示为可连通时,表示连通成功。

  7. 测试连通性通过后,单击完成

    在数据源列表中,可以查看新建的数据源。

步骤二:新建同步任务节点

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤三:配置离线同步任务并启动

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置同步网络链接。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组并保证资源组与读写两端的数据源能联通访问。

    1. 网络与资源配置步骤,选择数据来源Tablestore,并选择数据源名称为新增的源数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向Tablestore,并选择数据源名称为新增的目标数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

    5. 提示对话框,单击确认使用脚本模式

      重要
      • 表格存储仅支持脚本模式。当存在不支持向导模式的数据源时,如果继续编辑任务,将强制使用脚本模式进行编辑。

      • 任务转为脚本模式后,将无法转为向导模式。

  3. 配置任务并保存。

    全量数据的同步需要使用到Tablestore ReaderTablestore Writer插件。脚本配置规则请参见Tablestore数据源

    1. 配置任务步骤,编辑脚本。

      • 配置Tablestore Reader

        Tablestore Reader插件实现了从Tablestore读取数据,通过您指定的抽取数据范围,可以方便地实现数据增量抽取的需求。具体操作,请参见附录一:Reader脚本Demo与参数说明

      • 配置Tablestore Writer

        Tablestore Writer通过Tablestore官方Java SDK连接到Tablestore服务端,并通过SDK写入Tablestore服务端 。Tablestore Writer本身对于写入过程进行了诸多优化,包括写入超时重试、异常写入重试、批量提交等功能。具体操作,请参见附录二:Writer脚本Demo与参数说明

    2. 按【Ctrl+S】保存脚本。

      说明

      执行后续操作时,如果未保存脚本,则系统会出现保存确认的提示,单击确认即可。

  4. 执行同步任务。

    说明

    全量数据一般只需要同步一次,无需配置调度属性。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称。

    3. 单击运行

      运行结束后,在同步任务的运行日志页签,单击Detail log url对应的链接后。在任务的详细运行日志页面,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。