本文介绍如何使用阿里云Jindo DistCp从HDFS迁移数据到OSS。

背景信息

在传统大数据领域,HDFS经常作为大规模数据的底层存储。在进行数据迁移、数据拷贝的场景中,最常用的是Hadoop自带的DistCp工具。但是该工具不能很好利用对象存储OSS的特性,导致效率低下并且不能保证数据一致性。此外,该工具提供的功能选项较单一,无法很好地满足用户的需求。

阿里云Jindo DistCp(分布式文件拷贝工具)用于大规模集群内部或集群之间拷贝文件。Jindo DistCp使用MapReduce实现文件分发,错误处理和恢复,把文件和目录的列表作为MapReduce任务的输入,每个任务会完成源列表中部分文件的拷贝。全量支持HDFS之间、HDFS与OSS之间、以及OSS之间的数据拷贝场景,提供多种个性化拷贝参数和多种拷贝策略。

相对于Hadoop DistCp,使用阿里云Jindo DistCp从HDFS迁移数据到OSS具有以下优势:
  • 效率高,在测试场景中最高可达到1.59倍的加速。
  • 基本功能丰富,提供多种拷贝方式和场景优化策略。
  • 深度结合OSS,对文件提供归档、压缩等操作。
  • 实现No-Rename拷贝,保证数据一致性。
  • 场景全面,可完全替代Hadoop DistCp,目前支持Hadoop2.7+和Hadoop3.x。

前提条件

  • 如果您使用的是自建ECS集群,需要具备Hadoop2.7+或Hadoop3.x环境以及进行MapReduce作业的能力。
  • 如果您使用的是阿里云E-MapReduce:
    • 对于EMR3.28.0/bigboot2.7.0及以上的版本,可以通过Shell命令的方式使用Jindo DistCp。更多信息,请参见Jindo DistCp使用说明
    • 对于EMR3.28.0/bigboot2.7.0以下的版本,可能会存在一定的兼容性问题,您可以通过提交工单申请处理。

步骤一:下载JAR包

  1. 下载最新版本的Jindosdk-${version}.tar.gz。下载地址,请参见GitHub
  2. 下载tools文件夹下的jindo-distcp-tool-${version}.jar。

步骤二:配置OSS的AccessKey

您可以通过以下任意方式配置AccessKey:

  • 在示例命令中配置AccessKey
    hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --parallelism 10
  • 通过配置文件预先配置AccessKey

    JindoDistCp支持将OSS的AccessKey预先配置在Hadoop的core-site.xml文件中。

    <configuration>
        <property>
            <name>fs.oss.accessKeyId</name>
            <value>xxx</value>
        </property>
    
        <property>
            <name>fs.oss.accessKeySecret</name>
            <value>xxx</value>
        </property>
    
        <property>
            <name>fs.oss.endpoint</name>
            <!-- 阿里云ECS环境下推荐使用内网OSS Endpoint,即oss-cn-xxx-internal.aliyuncs.com -->
            <value>oss-cn-xxx.aliyuncs.com</value>
        </property>
    </configuration>
  • 配置免密功能

    配置免密功能,避免明文保存AccessKey,提高安全性。具体操作,请参见使用JindoFS SDK免密功能

步骤三:迁移或拷贝数据

  • 全量迁移或拷贝数据

    将HDFS指定目录/data/incoming下的数据全量迁移或拷贝到OSS目标路径oss://examplebucket/incoming/,示例命令如下:

    hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --parallelism 10

    示例中涉及的各参数或选项说明如下:

    参数及选项 说明 示例
    --src 待迁移或拷贝的HDFS数据所在的路径。 /data/incoming
    --dest OSS中存放迁移或拷贝数据的目标路径。 oss://examplebucket/incoming
    --hadoopConf 指定拥有访问OSS权限的AccessKey ID、AccessKey Secret以及Endpoint。
    • 关于获取AccessKey ID以及AccessKey Secret的具体操作,请参见创建AccessKey
    • 关于OSS支持的地域(Region)和对应的访问域名(Endopoint)列表信息,请参见访问域名和数据中心
      重要 ECS环境下推荐使用内网 ossEndPoint,即oss-cn-xxx-internal.aliyuncs.com
    --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com
    --parallelism 根据集群资源调整任务并发数。 10
  • 增量迁移或拷贝数据

    如果您仅希望拷贝在上一次全量迁移或拷贝后源路径下新增的数据,此时您可以结合--update选项完成数据的增量迁移或拷贝。

    将HDFS指定目录/data/incoming下的数据增量迁移或拷贝到OSS目标路径oss://examplebucket/incoming,示例命令如下:

    hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --update --parallelism 10

    使用--update选项时,默认开启校验和Checksum。开启后,DistCp将对源路径和目标路径的文件名称、文件大小以及文件的Checksum进行比较。如果源路径或目标路径下的文件名称、文件大小或者文件的Checksum不一致时,将自动触发增量迁移或拷贝任务。

    如果您不需要对源路径和目标路径的文件的Checksum进行比较,请增加--disableChecksum选项关闭Checksum校验,示例命令如下:

    hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --update --disableChecksum --parallelism 10

附录一:Jindo DistCp支持的参数及选项

Jindo DistCp提供一系列的参数及选项。您可以通过以下命令获取各参数及选项的具体用法。

hadoop jar jindo-distcp-tool-${version}.jar --help

各参数及选项的含义及其示例如下表所示。

参数及选项 说明 示例值
--src 指定拷贝的源路径。 --src oss://examplebucket/sourcedir
--dest 指定拷贝的目标路径。 --dest oss://examplebucket/destdir
--bandWidth 指定本次DistCp任务所用的单机带宽,单位为 MB。 --bandWidth 6
--codec 指定文件的压缩方式。当前版本支持编解码器gzip、gz、lzo、lzop和snappy,以及关键字none和keep。关键字含义如下:
  • none:保存为未压缩的文件。如果文件已压缩,则Jindo DistCp会将其解压缩。
  • keep(默认值):不更改文件压缩形态。
说明 如果您需要在开源Hadoop集群环境中使用lzo的压缩方式,请确保已安装gplcompression的native库和hadoop-lzo包。如果缺少相关环境,建议使用其他压缩方式进行压缩。
--codec gz
--policy 指定拷贝到OSS后的文件类型。取值:
  • ia:低频访问
  • archive:归档存储
  • coldArchive:冷归档存储
--policy coldArchive
--filters 通过filters参数指定一个文件路径。在这个文件中,每一行配置一个正则表达式,对应DistCp任务中不需要拷贝或比对的文件。 --filters test.txt
--srcPrefixesFile 指定需要拷贝的文件列表,列表里文件以src路径作为前缀。 --srcPrefixesFile prefixes.txt
--parallelism 指定MR任务里的 mapreduce.job.reduces参数。您可以根据集群的资源情况自定义parallelism 的大小,以控制distcp任务的并发度。
说明 该参数在EMR环境中默认值为7。
--parallelism 20
--tmp 指定在使用DistCp工具的过程中,用于存放临时文件的目录。 --tmp /tmp
--hadoopConf 指定拥有访问OSS权限的AccessKey及Endpoint。 --hadoopConf fs.oss.accessKeyId=yourkey --hadoopConf fs.oss.accessKeySecret=yoursecret --hadoopConf fs.oss.endpoint=oss-cn-xxx.aliyuncs.com
--disableChecksum 关闭Checksum校验。 --disableChecksum
--deleteOnSuccess 指定在拷贝任务完成后删除源路径下的文件。 --deleteOnSuccess
--enableTransaction JindoDistCp默认使用task级别完整性。如果您需要保证Job级别的完整性以及保证Job之间的事务支持,您可以使用--enableTransaction参数。 --enableTransaction
--ignore 忽略数据迁移期间发生的异常,相关报错不会中断任务,并最终以DistCp Counter的形式透出。如果使用了--enableCMS,也会通过指定方式进行通知。 -ignore
--diff 查看本次拷贝是否完成全部文件拷贝,并对未完成拷贝的文件生成文件列表。 --diff
--update 使用增量同步功能,即仅同步上一次全量迁移或拷贝后源路径下新增的数据到目标路径。 --update
--preserveMeta 迁移数据的同时迁移包括Owner,Group,Permission,Atime,Mtime,Replication,BlockSize,XAttrs,ACL等元数据信息。 --preserveMeta
--jobBatch 指定每个distcp任务处理的文件数量,默认值为1000。 --jobBatch 1000
--taskBatch 指定每个distcp子任务处理的文件数量,默认值为10。 --taskBatch 10

附录二:场景示例

场景一:使用JindoDistCp成功传输数据后,如何验证数据完整性?

JindoDistCp提供了以下两种方式用于验证数据的完整性。

  • 方式一:DistCp Counters

    通过Distcp Counters信息中包含的BYTES_EXPECTED、FILES_EXPECTED等参数验证数据完整性。

    示例
        JindoDistcpCounter
            BYTES_COPIED=10000
            BYTES_EXPECTED=10000
            FILES_COPIED=11
            FILES_EXPECTED=11
            ...
        Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0

    示例中可能包含的Counter参数如下:

    参数 说明
    BYTES_COPIED 拷贝成功的字节数。
    BYTES_EXPECTED 预期拷贝的字节数。
    FILES_COPIED 拷贝成功的文件数。
    FILES_EXPECTED 预期拷贝的文件数。
    FILES_SKIPPED 增量拷贝时跳过的文件数。
    BYTES_SKIPPED 增量拷贝时跳过的字节数。
    COPY_FAILED 拷贝失败的文件数,不为0时触发告警。
    BYTES_FAILED 拷贝失败的字节数。
    DIFF_FILES 源路径与目标路径下不相同的文件数,不为0时触发告警。
    DIFF_FAILED 文件比较操作异常的文件数,并计入DIFF_FILE。
    SRC_MISS 源路径下不存在的文件数,并计入DIFF_FILES。
    DST_MISS 目标路径下不存在的文件数,并计入DIFF_FILES。
    LENGTH_DIFF 源文件和目标文件大小不一致的数量,并计入DIFF_FILES。
    CHECKSUM_DIFF Checksum校验失败的文件数,并计入COPY_FAILED。
    SAME_FILES 源路径与目标路径下完全相同的文件数。
  • 方式二:通过--diff选项

    您可以在示例中结合--diff选项对源路径和目标路径下文件名和文件大小进行比较。

    hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --diff

场景二:如果要以低频访问、归档或者冷归档的方式存储写入OSS的文件,该使用哪些参数?

您可以在示例中添加--policy选项来指定写入OSS文件的存储类型。以下以指定为低频访问类型为例:
hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --policy ia --parallelism 10

如果需要指定为归档存储类型,请将--policy ia替换为--policy archive。如需指定为冷归档存储类型,请将--policy ia替换为--policy coldArchive。此外,目前冷归档存储仅支持部分地域,更多信息,请参见冷归档存储(Cold Archive)

场景三:迁移或者拷贝任务完成后,希望仅保留目标路径下的数据,且删除源路径下的指定数据,该使用哪些参数?

您可以结合--deleteOnSuccess选项,在迁移或者拷贝任务完成后,仅保留OSS目标路径下的数据,并删除HDFS源路径下的指定数据。

hadoop jar jindo-distcp-tool-${version}.jar --src /data/incoming --dest oss://examplebucket/incoming --hadoopConf fs.oss.accessKeyId=LTAI5t7h6SgiLSganP2m**** --hadoopConf fs.oss.accessKeySecret=KZo149BD9GLPNiDIEmdQ7dyNKG**** ---hadoopConf fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com --deleteOnSuccess --parallelism 10