本文通过场景化为您介绍如何使用Jindo DistCp。

前提条件

场景预览

Jindo DistCp常用使用场景如下所示:

场景一:导入HDFS数据至OSS,需要使用哪些参数?如果数据量很大、文件很多(百万千万级别)时,该使用哪些参数优化?

如果您使用的不是EMR环境,当从HDFS上往OSS传输数据时,需要满足以下几点:
  • 可以访问HDFS,并有读数据权限。
  • 需要提供OSSAccessKey(AccessKey IDAccessKey Secret),以及Endpoint信息,且该AccessKey具有写目标Bucket的权限。
  • OSS Bucket不能为归档类型。
  • 环境可以提交MapReduce任务。
  • 已下载Jindo DistCp JAR包。
本场景示例如下。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 10
说明 各参数含义请参见Jindo DistCp使用说明
当您的数量很大,文件数量很多,例如百万千万级别时,您可以增大parallelism,以增加并发度,还可以开启--enableBatch参数来进行优化。优化命令如下。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 500 --enableBatch

场景二:使用Jindo DistCp成功导完数据后,如何验证数据完整性?

您可以通过以下两种方式验证数据完整性:
  • Jindo DistCp Counters
    您可以在MapReduce任务结束的Counter信息中,获取Distcp Counters的信息。
    Distcp Counters
            Bytes Destination Copied=11010048000
            Bytes Source Read=11010048000
            Files Copied=1001
        
    Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
    参数含义如下:
    • Bytes Destination Copied:表示目标端写文件的字节数大小。
    • Bytes Source Read:表示源端读文件的字节数大小。
    • Files Copied:表示成功Copy的文件数。
  • Jindo DistCp --diff

    您可以使用--diff命令,进行源端和目标端的文件比较。该命令会对文件名和文件大小进行比较,记录遗漏或者未成功传输的文件,存储在提交命令的当前目录下,生成manifest文件。

    场景一的基础上增加--diff参数即可,示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --diff
    当全部文件传输成功时,系统返回如下信息。
    INFO distcp.JindoDistCp: distcp has been done completely

场景三:导入HDFS数据至OSS时,DistCp任务存在随时失败的情况,该使用哪些参数支持断点续传?

场景一的基础上,如果您的Distcp任务因为某种原因中间失败了,您想支持断点续传,只Copy剩下未Copy成功的文件,则需要您在进行上一次Distcp任务完成后进行如下操作:
  1. 增加一个--diff命令,查看所有文件是否都传输完成。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --diff
    当所有文件都传输完成,则会提示如下信息。
    INFO distcp.JindoDistCp: distcp has been done completely.
  2. 文件没有传输完成时会生成manifest文件,您可以使用--copyFromManifest--previousManifest命令进行剩余文件的Copy。示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --dest oss://destBucket/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20

    file:///opt/manifest-2020-04-17.gz为您当前执行命令的本地路径。

场景四:成功导入HDFS数据至OSS,数据不断增量增加,在Distcp过程中可能已经产生了新文件,该使用哪些参数处理?

在使用时,上游进程会不断产生新的文件。例如,新文件可能每小时或每分钟创建一次。您可以配置下游进程,按不同的日程安排接收文件。JindoDistCp提供了update方式来增量Copy文件。

此时在场景一的基础上,只需要增加一个--update参数即可完成增量Copy。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 10 --update

--update:指定为增量Copy模式,在Copy前先比较文件名,再比较文件大小,最后比较文件Checksum。如果您不希望比较文件Checksum,可以通过增加--disableChecksum参数来关闭,即只比较文件名和文件大小。

场景五:如果需要指定Jindo DistCp作业在Yarn上的队列以及可用带宽,该使用哪些参数?

场景一的基础上需要增加两个参数。两个参数可以配合使用,也可以单独使用。
  • --queue:指定Yarn队列的名称。
  • --bandwidth:指定带宽的大小,单位为MB。
示例如下。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --queue yarnqueue --bandwidth 6 --parallelism 10

场景六:当通过冷归档、归档或低频形式写入OSS,该使用哪些参数?

  • 当通过冷归档形式写入OSS时,需要在场景一的基础上增加--policy coldArchive参数,示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --policy coldArchive --parallelism 20
    说明 冷归档存储类型目前只支持部分地域,详情请参见存储类型介绍
  • 当通过归档形式写入OSS时,需要在场景一的基础上增加--policy archive参数,示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --policy archive --parallelism 20
  • 当通过低频形式写入OSS时,需要在场景一的基础上增加--policy ia参数,示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --policy ia --parallelism 20

场景七:针对小文件比例和文件大小情况,该使用哪些参数来优化传输速度?

  • 小文件较多,大文件较大情况。

    如果要Copy的所有文件中小文件的占比较高,大文件较少,但是单个文件数据较大,在正常流程中是按照随机方式来进行Copy文件分配,此时如果不做优化很可能造成一个Copy进程分配到大文件的同时也分配到很多小文件,不能发挥最好的性能。

    场景一的基础上增加--enableDynamicPlan开启优化选项,但不能和--enableBalancePlan一起使用。示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --enableDynamicPlan --parallelism 10
    优化对比如下。优化
  • 文件总体均衡,大小差不多情况。
    如果您要Copy的数据里文件大小总体差不多,比较均衡,您可以使用--enableBalancePlan优化。示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --enableBalancePlan --parallelism 10
    优化对比如下。优化2

场景八:如果需要使用S3作为数据源,该使用哪些参数?

需要在场景一的基础上替换OSS的AccessKey和EndPoint信息转换成S3参数:
  • --s3Key:连接S3AccessKey ID。
  • --s3Secret:连接S3AccessKey Secret。
  • --s3EndPoint:连接S3EndPoint信息。
示例如下。
hadoop jar jindo-distcp-<version>.jar --src s3a://yourbucket/ --dest oss://destBucket/hourly_table --s3Key yourkey --s3Secret yoursecret --s3EndPoint s3-us-west-1.amazonaws.com --parallelism 10

场景九:如果需要写入文件至OSS并压缩文件(LZOGZ格式等)时,该使用哪些参数?

如果您想压缩写入的目标文件,例如LZOGZ等格式,以降低目标文件的存储空间,您可以使用--outputCodec参数来完成。

需要在场景一的基础上增加--outputCodec参数,示例如下。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --outputCodec=gz --parallelism 10
Jindo DistCp支持编解码器GZIP、GZ、LZO、LZOP和SNAPPY以及关键字none和keep(默认值)。这些关键字含义如下:
  • none表示保存为未压缩的文件。如果文件已压缩,则Jindo DistCp会将其解压缩。
  • keep表示不更改文件压缩形态,按原样复制。
说明 如您在开源Hadoop集群环境中使用LZO压缩功能,则您需要安装gplcompression的native库和hadoop-lzo包,

场景十:如果需要把本次Copy中符合特定规则或者同一个父目录下的部分子目录作为Copy对象,该使用哪些参数?

  • 如果您需要将Copy列表中符合一定规则的文件进行Copy,需要在场景一的基础上增加--srcPattern参数,示例如下。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --srcPattern .*\.log --parallelism 10

    --srcPattern:进行过滤的正则表达式,符合规则进行Copy,否则抛弃。

  • 如果您需要Copy同一个父目录下的部分子目录,需要在场景一的基础上增加--srcPrefixesFile参数。
    hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --srcPrefixesFile file:///opt/folders.txt --parallelism 20

    --srcPrefixesFile:存储需要Copy的同父目录的文件夹列表的文件。

    示例中的folders.txt内容如下。
    hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
    hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02

场景十一:如果想合并符合一定规则的文件,以减少文件个数,该使用哪些参数?

需要在场景一的基础上增加如下参数:
  • --targetSize:合并文件的最大大小,单位MB。
  • --groupBy:合并规则,正则表达式。
示例如下。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --targetSize=10 --groupBy='.*/([a-z]+).*.txt' --parallelism 20

场景十二:如果Copy完文件,需要删除原文件,只保留目标文件时,该使用哪些参数?

需要在场景一的基础上,增加--deleteOnSuccess参数,示例如下。
hadoop jar jindo-distcp-<version>.jar --src /data/incoming/hourly_table --dest oss://destBucket/hourly_table --ossKey yourkey --ossSecret yoursecret --ossEndPoint oss-cn-hangzhou.aliyuncs.com --deleteOnSuccess --parallelism 10

场景十三:如果不想将OSS AccessKey这种参数写在命令行里,该如何处理?

通常您需要将OSS AccessKey和endPoint信息写在参数里,但是Jindo DistCp可以将OSS的AccessKey ID、AccessKey Secret和Endpoint预先写在Hadoop的core-site.xml文件里 ,以避免使用时多次填写的问题。
  • 如果您需要保存OSSAccessKey相关信息,您需要将以下信息保存在core-site.xml中。
    <configuration>
        <property>
            <name>fs.jfs.cache.oss-accessKeyId</name>
            <value>xxx</value>
        </property>
    
        <property>
            <name>fs.jfs.cache.oss-accessKeySecret</name>
            <value>xxx</value>
        </property>
    
        <property>
            <name>fs.jfs.cache.oss-endpoint</name>
            <value>oss-cn-xxx.aliyuncs.com</value>
        </property>
    </configuration>
  • 如果您需要保存S3AccessKey相关信息,您需要将以下信息保存在core-site.xml中。
    <configuration>
        <property>
            <name>fs.s3a.access.key</name>
            <value>xxx</value>
        </property>
        <property>
            <name>fs.s3a.secret.key</name>
            <value>xxx</value>
        </property>
        <property>
            <name>fs.s3.endpoint</name>
            <value>s3-us-west-1.amazonaws.com</value>
        </property>
    </configuration>