通过DataWorks使用DistCp任务实现大规模文件迁移

当您需要大文件或多文件的纯文件复制时,可以使用DataWorks创建DistCp任务,该方式能够极大的提高同步传输的效率,实现跨文件系统、大规模数据迁移和同步需求。

背景信息

什么是DistCp任务

DistCp(Distributed Copy)是一种分布式数据拷贝工具,最初是 Apache Hadoop 生态系统中的核心组件,用于在 Hadoop 集群之间或 HDFS 内部进行大规模数据迁移。

在阿里云 DataWorks 环境中,您可以为DataWorks工作空间绑定E-MapReduce计算资源,使用Hadoop组件里的distcp 工具创建跨文件系统(如 OSS、HDFS等)的数据传输任务,实现企业级大批量文件数据迁移和同步。

DistCp任务的优势

  • 跨文件系统:

    支持多种文件系统间同步,包括:OSS、OSS-HDFS、HDFS、AWS S3、腾讯云COS等文件系统。

  • 分布式并行处理:

    通过多线程并行拷贝,利用集群资源加速传输,避免单点性能瓶颈。

适用场景

适用于大文件或多文件的纯文件复制传输,例如将数据从HDFS 迁移到阿里云 OSS。

前提条件

注意事项

  • 使用DataWorks创建DistCp任务基于EMR集群中的distcp工具,因此您需要确保DataWorks资源组与EMR集群网络连通,详情请参见网络连通方案

  • distcp工具用于数据来源与去向的数据传输,因此您需要确保EMR集群与数据来源和数据去向间的网络已连通。

  • 为确保任务顺利运行,请在ECS管理控制台为您所创建的EMR集群绑定的安全组添加安全组规则,开放ECS入方向的80323563710020端口,设置授权对象为资源组VPC的交换机网段。详情请参见添加安全组规则

一、DataWorks工作空间绑定EMR计算资源

创建DistCp任务需要使用EMR集群中的distcp工具,因此需要先创建EMR集群并将其绑定为DataWorks的计算资源。

  1. 创建EMR on ECS集群

    配置要点(以下仅为关键配置,其他参数保持默认或按需配置即可):

    • 业务场景选择数据湖

    • 专有网络交换机推荐与DataWorks资源组选择相同专有网络和交换机。

    • 您需要确保EMR集群配置的专有网络交换机与即将进行的数据传输来源和去向网络已连通。

  2. 绑定EMR计算资源

二、创建手动任务EMR Shell节点

DistCp任务需要在DataWorks数据开发中创建手动任务来执行。

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 在左侧导航栏单击image,进入手动业务流程,然后单击新建 > 新建业务流程,自定义业务流程名称(本教程业务流程名称设置为distcp_flow)。

  3. 右键已创建的业务流程,选择新建节点 > EMR Shell,自定义节点名称(本教程节点名称设置为distcp_test)。

三、配置DistCp任务

EMR Shell节点编辑区域,配置如下DistCp任务内容。

cat>dataworks.distcp.json<<EOF
<同步任务配置JSON>
EOF

/opt/taobao/tbdpapp/emrwrapper/dataworks-distcp.sh dataworks.distcp.json

<同步任务配置JSON>部分需要按任务配置说明的格式要求进行配置。

本教程以HDFS同步到OSS为例,配置示例如下

{
  "type": "job",
  "version": "2.0",
  "steps": [
    {
      "stepType": "hdfs",
      "parameter": {
        "path": [
          "hdfs://****/tmp/tpcds-generate/10_20240802"
        ],
        "pattern": [
          "- *_SUCCESS"
        ]
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "oss",
      "parameter": {
        "path": "oss://****.cn-shanghai.oss-dls.aliyuncs.com/tmp/tpcds-generate/10_20240802_copy",
        "hadoopConfig": {
            "fs.oss.endpoint": "oss-cn-shanghai-internal.aliyuncs.com"
        },
        "bucket": "****",
        "writeMode": "overwrite",
        "bufferSize": 8192
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "mbps": 10,
    "skipCrcCheck": true,
    "concurrent": 10,
    "ignoreFailures": "true"
  }
}

任务配置说明

DistCp任务的JSON配置文件主要包括reader、writer、setting三部分。

  • reader部分主要配置读取端的参数配置,例如读取哪些文件。

  • writer部分主要配置写入端的参数配置,例如写入目录、写入策略等。

  • setting部分主要配置任务粒度的整体参数,例如流控、是否容忍脏数据等。

JSON格式如下:

{
  "type": "job",
  "version": "2.0",
  "steps": [
    {
      "stepType": "hdfs",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "hdfs",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {}
}

DataWorks中,readerwritersetting部分的配置方法与开源hadoop distcp类似,具体参数说明如下:

reader端配置

参数

说明

配置示例

path

用于控制数据同步读取的目录,来自源头的数据会按照目录层次依次读取,写入到对应的目标文件系统。

配置格式:一个字符串数组,表示可以读取多个源头目录。

默认值:无。

"path": [ "/tmp/app_data/" ]

pattern

用于控制数据读取的文件过滤器,在源头目录path下,只有通过过滤器校验的文件,才会传输到目标端。pattern过滤器会对源头path每个目录地址生效。您可以控制哪些文件应该被包括(include)或排除(exclude)在复制操作中。

配置格式:一个字符串数组,每个过滤元素前面的+表示包含(include),-表示排除(exclude)。+-后面跟一个空格,空格后面是每个过滤器规则(可以是正在表达式)。

默认值:空,表示不做过滤,全部传输。

"pattern": [ "+ a.txt", "+ c/d.txt", "- *.log"]

  • 表示包含a.txt,包含子目录c内的d.txt,排除所有.log文件。

  • 包含与排除同时命中一个文件,包含生效

  • 包含与排除都没有命中的文件,执行同步传输。

splitMode

用于控制数据同步的策略,主要会影响数据读取端的切分策略。支持的切分并发策略有:

  • dynamic:动态调控策略,每个并发传输的数据量可以不同,运行更快的并发会传输更多的字节。

  • uniformsize:统一大小调控策略,任务切分会考虑文件大小,让每个并发同步一样总字节大小的文件。

默认值:uniformsize,默认使用统一大小策略。

"splitMode": "dynamic"

listFileConcurrent

用于控制列出待同步文件的并发数,更大的并发在寻找文件时更快。

默认值:1,最大允许设置为40,超过40会被设置为40。

"listFileConcurrent": 10

writer端配置

参数

说明

配置示例

path

用于控制数据同步写入的目录,来自源头的数据会按照目录层次依次写入到目标path目标下。

配置格式:一个字符串。

默认值:无。

"path": "/tmp/app_data/"

enableRelative

用于控制数据同步写入的目录行为。

  • true:表示在目标目录下,同步建立源文件所在目录的完整目录层级。

    例如:来源hdfs://hdfs:port/a/b/c/d.txt,目标oss://oss:port/,同步后写入文件为oss://oss:port/a/b/c/d.txt

  • false:表示以目标目录为根,不建立源文件所在目录,只同步文件。

    例如:来源hdfs://hdfs:port/a/b/c/d.txt,目标oss://oss:port/,同步后写入文件为oss://oss:port/d.txt

默认值:false

"enableRelative": "true"

writeMode

表示数据写入模式,目前支持三种写出模式:overwriteupdateappend,各个参数含义如下:

  • overwrite:在目标路径中强制覆盖现有文件。如果目标路径中已经包含与源路径中同名的文件,此选项将会覆盖这些文件。

  • update:仅更新目标路径中较旧或不存在的文件。此选项会跳过在目标路径中已经是最新版本的文件,避免不必要的复制操作,可以节省时间和带宽。

  • append:将源文件内容追加到目标文件中,而不是替换它们。主要用于日志文件和其他需要累积数据的文件。

    说明

    append模式下,skipCrcCheck会被强制设置为true

默认值:无,此参数为必选参数。

"writeMode": "update"

preserveStatus

用于控制文件同步时,是否同步文件元数据。取值范围:

  • REPLICATION

  • BLOCKSIZE

  • USER

  • GROUP

  • PERMISSION

  • CHECKSUMTYPE

  • ACL

  • XATTR

  • TIMES

  • OWNER

  • REPLICATION FACTOR

配置格式:一个字符串数组,表示可以同步多个元数据。

默认值:空,表示不同步元数据。

"preserveStatus": ["ACL"]

chunkSize

表示在文件传输时,是否将文件根据分块大小做切分传输,传输到目标端后再做文件内容合并。只有源文件系统支持getBlockLocations方法,目标文件系统支持concat方法时才可用。这里的数字表示一个文件包含的block数,每个block字节数是hdfs的系统配置项,例如64MB。

默认值:0,表示不切分。

"chunkSize": 0

bufferSize

表示数据缓冲的大小,会影响数据写入的效率。

默认值:8×1024,即8KB。

"bufferSize": 8192

deleteMissingSource

表示目标端文件存在,但是源头不存在的情况下,是否删除目标端文件和目录。

取值范围:

  • true:删除目标端多余的文件和目录。

  • false:不删除目标端多余的文件和目录。

默认值:false

"deleteMissingSource": "false"

hadoopConfig

您可以在hadoopConfig中配置访问目标hdfs的高级配置,例如oss、oss-hdfs的访问地址:

  • oss配置举例:

    {
        "fs.oss.endpoint": "oss-cn-shanghai-internal.aliyuncs.com"
    }
  • oss-hdfs配置举例:

    {
        "fs.oss.endpoint": "cn-shanghai.oss-dls.aliyuncs.com",
        "fs.AbstractFileSystem.oss.impl": "com.aliyun.jindodata.oss.OSS",
        "fs.oss.impl": "com.aliyun.jindodata.oss.JindoOssFileSystem",
        "fs.oss.credentials.provider": "com.aliyun.jindodata.auth.SimpleAliyunCredentialsProvider"
    }
  • 如果访问oss、oss-hdfs需要制定特定账号AccessKey,可以配置:

    {
        "fs.oss.endpoint": "oss-cn-shanghai-internal.aliyuncs.com",
        "fs.oss.accessKeyId": "xxxxx",
        "fs.oss.accessKeySecret": "xxxxx"
    }

setting端配置

参数

说明

配置示例

skipCrcCheck

用于表示是否跳过源头和目标的CRC校验。

重要

writeModeappend的情况下,skipCrcCheck会被强制设置为true

取值范围:

  • true:跳过校验。

  • false:不跳过校验。

默认值:false。

"skipCrcCheck": "false"

checksumCombineMode

在开启CRC检查的情况下,如果源和目标是异构系统,可能出现报错:

Their checksum algorithms may be incompatible You can choose file-level checksum validation via -Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes or filesystems are different. Or you can skip checksum-checks altogether  with -skipcrccheck.
 (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)

您可以设置"skipCrcCheck": "true"跳过CRC检查。

也可以使用此参数,确定在使用 DistCp 进行文件复制时,源存储系统与目标存储系统之间校验和的生成和合并策略。取值范围:

  • MD5MD5CRC:默认行为,采用双重MD5+CRC校验方式。

  • COMPOSITE_CRC:组合循环冗余校验(CRC)方法,经测试在 HDFS -> OSS-HDFS 的场景下,可以通过目标端校验机制。

"checksumCombineMode": "COMPOSITE_CRC"

ignoreFailures

用于控制是否忽略同步传输中的错误,即个别文件同步失败不中断任务执行。

取值范围:

  • true:中间同步失败,不中断任务执行。

  • false:中间同步失败,中断任务执行。

默认值:false

"ignoreFailures": "true"

concurrent

用于表示任务传输的并发度,最多会有concurrent个文件处于传输状态。在底层上,concurrentMapReduce程序的Map最大并行度。

默认值:10,表示传输的并发度是10。

"concurrent": 10

mbps

用于控制任务传输中,每个并发的流速上限,同步程序会保障每个并发传输的速度上限在mbps之下。mbps单位为MB/s,整个任务的速度上限为mbps × concurrent

取值范围:

  • 配置≤0,表示不限速。

  • 配置>0,表示限速。可以设置为浮点数,例如:5.5表示限速5.5MB/s

默认值:-1。

"mbps": 10

queue

用于表示任务运行的队列,文件同步任务在底层是一个MapReduce程序,通过queue限制任务运行的队列(配置为yarn的队列名),可以起到监控和限制任务资源使用的目的。

默认值:无。

"queue": "distcpQueue"

三、发布任务

单击EMR Shell节点顶部工具栏的保存(image),然后单击提交(image)。如果是标准模式环境,还需要将节点发布至生产环境,详情请参见发布任务

四、运行任务

  1. 单击页面左上角的image,选择全部产品 > 数据开发与运维 > 运维中心,进入运维中心。

  2. 在运维中心左侧导航栏单击手动任务运维 > 手动任务,然后在页面右侧切换至手动业务流程,找到已创建的EMR Shell节点(本教程节点名为distcp_test)。

  3. 右键EMR Shell节点,选择运行

    image

五、查看日志和运维

节点运行后,您可以在左侧导航栏单击手动任务运维 > 手动实例,找到已经运行的任务实例,右键选择查看运行日志停止运行等运维操作。

image