数据表同步到数据表

更新时间:2025-02-28 01:55:44

本文介绍如何将表格存储中数据表的数据迁移或同步到另一个数据表。您可以通过通道服务、DataWorks、DataX或命令行工具等方式实现此操作。

前提条件

  • 已获取源数据表和目标数据表的实例名称、实例访问地址、地域ID等信息。

  • 已创建目标数据表,并且目标数据表的主键数量和类型与源表保持一致。具体操作,请参见创建数据表

  • 已获取 AccessKey 信息。请使用阿里云账号或RAM用户的 AccessKey 进行配置。获取AccessKey的具体操作,请参见如何获取AccessKey

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。您可以创建RAM用户、授予该用户管理表格存储权限( AliyunOTSFullAccess )并为该RAM用户创建AccessKey。具体操作,请参见使用RAM用户访问密钥访问表格存储

使用通道服务同步

创建源数据表的通道后,使用SDK进行数据同步。同步过程中可以自定义业务处理逻辑对数据进行处理。

  1. 创建源数据表的通道并记录通道ID,具体操作,请参见创建数据通道

  2. 使用SDK同步数据。

    Java示例代码如下:

    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.model.StreamRecord;
    import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
    import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
    
    import java.util.List;
    
    public class TunnelSample {
    
        public static void main(String[] args) {
            // 源表的实例名称
            final String sourceInstanceName = "sourceInstanceName";
            // 源表的实例访问地址
            final String sourceEndpoint = "sourceEndpoint";
            // 获取环境变量中源表所在实例的访问凭证 AccessKey ID 和 AccessKey Secret
            final String sourceAccessKeyId = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
            final String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");
    
            // 初始化 TunnelClient
            TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
    
            // 配置 TunnelWorkerConfig
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            // 配置 TunnelWorker,并启动自动化的数据处理任务。
            // tunnelId,通道 ID。该值可以在表格存储控制台的实时消费通道页签查看,或通过ListTunnel或者DescribeTunnel获取。
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor {
            // 目标表的实例名称
            final String targetInstanceName = "targetInstanceName";
            // 目标表的实例访问地址
            final String targetEndpoint = "targetEndpoint";
            // 获取环境变量中目标表所在实例的访问凭证 AccessKey ID 和 AccessKey Secret
            final String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
            final String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");
    
            // 初始化目标表所在实例的 Tablestore Client
            SyncClient client = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
    
            @Override
            public void process(ProcessRecordsInput processRecordsInput) {
                // ProcessRecordsInput中返回了增量或全量数据。
                List<StreamRecord> list = processRecordsInput.getRecords();
                for (StreamRecord streamRecord : list) {
                    // 自定义业务处理逻辑。
                    switch (streamRecord.getRecordType()) {
                        case PUT:
                            // putRow
                            break;
                        case UPDATE:
                            // updateRow
                            break;
                        case DELETE:
                            // deleteRow
                            break;
                    }
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
    
            }
        }
    }

使用DataWorks或者DataX同步

通过DataWorks或者DataX实现表格存储中数据表的同步,此处以DataWorks为例介绍具体操作。

准备工作

已开通DataWorks服务并创建工作空间。具体操作,请参见开通DataWorks服务创建工作空间

步骤一:新增表格存储数据源

分别为源数据表和目标数据表所在的实例新增表格存储数据源。

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源列表页面,单击新增数据源

  4. 新增数据源对话框,找到Tablestore区块,单击Tablestore

  5. 新增OTS数据源对话框,根据下表配置数据源参数。

    参数

    说明

    参数

    说明

    数据源名称

    数据源名称必须以字母、数字、下划线(_)组合,且不能以数字和下划线(_)开头。

    数据源描述

    对数据源进行简单描述,不得超过80个字符。

    地域

    选择Tablestore实例所属地域。

    Table Store实例名称

    Tablestore实例的名称。更多信息,请参见实例

    Endpoint

    Tablestore实例的服务地址,推荐使用VPC地址

    重要

    本文以Tablestore实例和DataWorks工作空间在同一阿里云主账号的同一地域下为例进行说明。更多场景信息,请参见各场景网络连通配置示例

    AccessKey ID

    阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

    AccessKey Secret

  6. 测试资源组连通性。

    创建数据源时,您需要测试资源组的连通性,以保证同步任务使用的资源组能够与数据源连通,否则将无法正常执行数据同步任务。

    1. (可选)购买并绑定资源组至当前DataWorks工作空间。具体操作,请参见新增和使用Serverless资源组

      不推荐使用旧版资源组(独享资源组和公共资源组),相较于旧版资源组,Serverless资源组支持的能力更丰富、售卖方式更统一、能有效利用资源碎片避免浪费,因此推荐您使用Serverless资源组。

      说明

      Serverless资源组默认不具备公网访问能力。需要为绑定的VPC配置公网NAT网关EIP后,才支持公网访问数据源。

    2. 待资源组启动成功后,在连接配置区域,单击相应资源组连通状态列的测试连通性

    3. 测试连通性通过后,连通状态显示可连通,单击完成

      在数据源列表中,可以查看新建的数据源。

      说明

      如果显示无法连通,表示资源组与数据源无法连通,后续相应数据源任务将无法正常执行。您可以参考以下思路排查处理。

      • 根据右侧弹出的连通性诊断工具窗口,自助解决连通性问题。

      • 如果连通性诊断工具未给出具体解决办法,请检查您设置的账号、密码、连接地址等参数,以及确保将资源组的IP地址加入到数据源的白名单中。更多信息,请参见网络连通方案

步骤二:新建同步任务节点

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. DataStudio控制台的数据开发页面,单击业务流程节点下的目标业务流程。

    如果需要新建业务流程,请参见创建业务流程

  3. 数据集成节点上右键选择新建节点 > 离线同步

  4. 新建节点对话框,选择路径并填写节点名称。

  5. 单击确认

    数据集成节点下会显示新建的离线同步节点。

步骤三:配置离线同步任务并启动

  1. 数据集成节点下,双击打开新建的离线同步任务节点。

  2. 配置网络与资源。

    选择离线同步任务的数据来源、数据去向以及用于执行同步任务的资源组,并测试连通性。

    重要

    数据同步任务的执行必须经过资源组来实现,请选择资源组,并保证资源组与读写两端的数据源可以联通访问。

    1. 网络与资源配置步骤,选择数据来源Tablestore,并选择数据源名称为新增的源数据源。

    2. 选择资源组。

      选择资源组后,系统会显示资源组的地域、规格等信息以及自动测试资源组与所选数据源之间连通性。

      重要

      请与新增数据源时选择的资源组保持一致。

    3. 选择数据去向Tablestore,并选择数据源名称为新增的目标数据源。

      系统会自动测试资源组与所选数据源之间连通性。

    4. 测试可连通后,单击下一步

  3. 配置任务并保存。

    脚本模式
    向导模式
    1. 配置任务步骤,单击image.png图标,然后在弹出的对话框中单击确认

    2. 在脚本配置页面,配置脚本。脚本配置规则请参见Tablestore数据源

      • 配置Tablestore Reader

        Tablestore Reader插件实现了从Tablestore读取数据,通过您指定的抽取数据范围,可以方便地实现数据增量抽取的需求。具体操作,请参见Reader脚本Demo与参数说明

      • 配置Tablestore Writer

        Tablestore Writer通过Tablestore官方Java SDK连接到Tablestore服务端,并通过SDK写入Tablestore服务端 。Tablestore Writer本身对于写入过程进行了诸多优化,包括写入超时重试、异常写入重试、批量提交等功能。具体操作,请参见Writer脚本Demo与参数说明

    3. 单击image.png图标,保存配置。

    1. 配置任务步骤的配置数据来源与去向区域,根据实际情况配置数据来源和数据去向。

      数据来源
      数据去向

      参数

      说明

      源数据表名称。

      主键区间分布(起始)

      数据读取的起始主键和结束主键,格式为JSON数组。

      起始主键和结束主键需要是有效的主键或者是由INF_MININF_MAX类型组成的虚拟点,虚拟点的列数必须与主键相同。其中INF_MIN表示无限小,任何类型的值都比它大;INF_MAX表示无限大,任何类型的值都比它小。

      数据表中的行按主键从小到大排序,读取范围是一个左闭右开的区间,返回的数据是大于等于起始主键且小于结束主键的所有行。

      主键区间分布(结束)

      切分配置信息

      自定义切分配置信息,普通情况下不建议配置。

      Tablestore数据存储发生热点,且使用Tablestore Reader自动切分的策略不能生效时,建议使用自定义的切分规则。切分指定的是在主键起始和结束区间内的切分点,仅配置切分键,无需指定全部的主键。格式为JSON数组。

      参数

      说明

      目标数据表名称。

      主键信息

      目标数据表的主键信息。

      写入模式

      数据写入表格存储的模式,支持以下两种模式:

      • PutRow:对应于Tablestore API PutRow,插入数据到指定的行。如果该行不存在,则新增一行。如果该行存在,则覆盖原有行。

      • UpdateRow:对应于Tablestore API UpdateRow,更新指定行的数据。如果该行不存在,则新增一行。如果该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。

    2. 配置字段映射。

      选择数据来源和数据去向后,需要指定来源字段目标字段的映射关系,配置字段映射关系后,任务将根据字段映射关系,将源表字段写入目标表对应类型的字段中。

      image

    3. 配置通道控制。

      您可通过通道配置,控制数据同步过程相关属性。相关参数说明详情可参见离线同步并发和限流之间的关系

    4. 单击image.png图标,保存配置。

  4. 执行同步任务。

    说明

    由于全量数据一般只需要同步一次,所以无需配置调度属性。

    1. 单击1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e图标。

    2. 参数对话框,选择运行资源组的名称。

    3. 单击运行

      运行结束后,在同步任务的结果页签,单击Detail log url对应的链接后。在任务的详细运行日志页面,查看Current task status对应的状态。

      Current task status的值为FINISH时,表示任务运行完成。

使用命令行工具迁移

重要

在使用命令行工具进行数据迁移时,您需要手动将源表的数据导出为本地JSON文件,随后再将其导入到目标表。此方法仅适用于少量数据的迁移场景。针对大规模数据迁移,不建议采用此方法。

准备工作

已下载命令行工具。具体操作,请参见下载命令行工具

步骤一:导出源表数据

  1. 启动命令行工具,并配置源表所在实例的接入信息。具体操作,请参见启动并配置接入信息

    通过config命令配置接入信息。

    执行前请使用源表所在的实例访问地址、实例名称、AccessKey ID、AccessKey Secret替换命令中的endpoint、instance、id、key。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. 导出数据。

    1. 执行use命令以使用源表。此处以source_table为例。

      use --wc -t source_table
    2. 导出源表中的数据到本地JSON文件中。具体操作,请参见导出数据

      scan -o /tmp/sourceData.json

步骤二 :导入目标表数据

  1. 配置目标表所在实例的接入信息。

    通过config命令配置接入信息。

    执行前请使用目标表所在的实例访问地址、实例名称、AccessKey ID、AccessKey Secret替换命令中的endpoint、instance、id、key。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. 导入数据。

    1. 执行use命令以使用目标表。此处以target_table为例。

      use --wc -t target_table
    2. 导入本地JSON文件中的数据到目标表中。具体操作,请参见导入数据

      import -i /tmp/sourceData.json 

相关文档

如果要实现跨账号、跨地域数据迁移,您也可以使用DataX工具通过互联网或者通过云企业网连接VPC进行操作。关于使用云企业网的具体操作,请参见云企业网快速入门

  • 本页导读 (1)
  • 前提条件
  • 使用通道服务同步
  • 使用DataWorks或者DataX同步
  • 准备工作
  • 步骤一:新增表格存储数据源
  • 步骤二:新建同步任务节点
  • 步骤三:配置离线同步任务并启动
  • 使用命令行工具迁移
  • 准备工作
  • 步骤一:导出源表数据
  • 步骤二 :导入目标表数据
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等