通过 SQL 接口实现分布式数据迁移:DISTCP 详解

Serverless Spark 提供了类 Hadoop DistCp 的分布式数据拷贝能力,支持跨存储系统(如 HDFS、OSS、OSS-HDFS)高效迁移海量文件。通过纯 SQL 接口简化操作流程,用户无需编写复杂代码即可完成大规模数据复制任务。

支持版本

esr-4.6.0及之后引擎版本支持 DISTCP 功能。

DISTCP SQL语法

DISTCP 命令会自动递归拷贝源路径下所有文件到目标路径下,如果目标路径目录不存在,则会自动创建。为确保功能可用,提交Spark批作业时必须指定 spark.emr.enableDistcp true,暂不支持在sql会话、kyuubisession中通过set命令指定。

DISTCP FROM 'source_path' TO 'destination_path' [options(key1='v1', key2='v2')];

参数说明

参数名

类型

描述

source_path

STRING

源路径。

destination_path

STRING

目标路径。

OPTIONS(...)

KEY-VALUE 对

可选参数,用于控制拷贝行为和过滤规则。

OPTIONS 配置项

参数名

类型

描述

filters

STRING

指定一个包含过滤规则的文本文件路径(需为可访问的 OSS 路径)。正则格式遵循 Java 的 java.util.regex.Pattern 标准。

mode

STRING

控制写入行为:
COPY(默认):若目标路径存在同名文件,先删除再拷贝。
UPDATE:仅当源文件与目标文件大小不一致时才覆盖;否则跳过。适用于增量同步场景。

使用示例

基础用法:全量拷贝

DISTCP FROM 'oss://bucket-a/data/input' TO 'oss://bucket-b/data/input-backup';

SQL作业中,返回结果显示“successfully executed”,即成功执行。

image

增量同步 + 过滤规则

DISTCP FROM 'oss://bucket-a/data/input' TO 'oss://bucket-b/data/input-backup' OPTIONS(mode='UPDATE', filters='oss://my-bucket/config/exclude_rules.txt');

SQL作业中,返回结果显示“successfully executed”,即成功执行。

image

EXPLAIN 查看执行计划

使用 EXPLAIN 可预览 DISTCP 操作的执行计划,帮助评估资源消耗和验证配置正确性。

EXPLAIN DISTCP FROM 'src_path' TO 'dest_path';

输出信息包括:预计拷贝的文件数量和总数据量、实际生成的任务(task)数量,以及过滤规则(如 filters 文件)是否成功加载等执行计划详情。

image

影响 Task 并发度的关键配置

DISTCP 的并行处理能力由以下两个 Spark 配置项共同决定:

配置项

说明

spark.sql.files.maxPartitionBytes

单个任务最大处理的数据块大小。值越小,并行度越高,但任务数增多可能增加调度开销。

spark.sql.files.openCostInBytes

文件打开代价估算值,用于平衡小文件合并。较高的值会减少对小文件的过度分区。

调优建议

  • 大文件场景:适当调高 maxPartitionBytes 以减少 task 数量。

  • 大量小文件:降低 openCostInBytes 以提升并行度,避免串行读取。